surfpool_core/rpc/
ws.rs

1use std::{
2    collections::HashMap,
3    str::FromStr,
4    sync::{Arc, RwLock, atomic},
5};
6
7use jsonrpc_core::{Error, ErrorCode, Result};
8use jsonrpc_derive::rpc;
9use jsonrpc_pubsub::{
10    SubscriptionId,
11    typed::{Sink, Subscriber},
12};
13use solana_account_decoder::{UiAccount, UiAccountEncoding};
14use solana_client::{
15    rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig, RpcTransactionLogsFilter},
16    rpc_response::{
17        ProcessedSignatureResult, ReceivedSignatureResult, RpcLogsResponse, RpcResponseContext,
18        RpcSignatureResult,
19    },
20};
21use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
22use solana_pubkey::Pubkey;
23use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
24use solana_signature::Signature;
25use solana_transaction_status::TransactionConfirmationStatus;
26
27use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
28use crate::surfnet::{GetTransactionResult, SignatureSubscriptionType};
29
30/// Configuration for account subscription requests.
31///
32/// This struct defines the parameters that clients can specify when subscribing
33/// to account change notifications through WebSocket connections. It allows customization
34/// of both the commitment level for updates and the encoding format for account data.
35///
36/// ## Fields
37/// - `commitment`: Optional commitment configuration specifying when to send notifications
38///   (processed, confirmed, or finalized). Defaults to the node's default commitment level.
39/// - `encoding`: Optional encoding format for account data serialization (base58, base64, jsonParsed, etc.).
40///   Defaults to base58 encoding if not specified.
41///
42/// ## Usage
43/// Clients can provide this configuration to customize their subscription behavior:
44/// - Set commitment level to control notification timing based on confirmation status
45/// - Set encoding to specify the preferred format for receiving account data
46///
47/// ## Example Usage
48/// ```json
49/// {
50///   "commitment": "confirmed",
51///   "encoding": "base64"
52/// }
53/// ```
54#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct RpcAccountSubscribeConfig {
57    #[serde(flatten)]
58    pub commitment: Option<CommitmentConfig>,
59    pub encoding: Option<UiAccountEncoding>,
60}
61
62#[rpc]
63pub trait Rpc {
64    type Metadata;
65
66    /// Subscribe to signature status notifications via WebSocket.
67    ///
68    /// This method allows clients to subscribe to status updates for a specific transaction signature.
69    /// The subscriber will receive notifications when the transaction reaches the desired confirmation level
70    /// or when it's initially received by the network (if configured).
71    ///
72    /// ## Parameters
73    /// - `meta`: WebSocket metadata containing RPC context and connection information.
74    /// - `subscriber`: The subscription sink for sending signature status notifications to the client.
75    /// - `signature_str`: The transaction signature to monitor, as a base-58 encoded string.
76    /// - `config`: Optional configuration specifying commitment level and notification preferences.
77    ///
78    /// ## Returns
79    /// This method does not return a value directly. Instead, it establishes a WebSocket subscription
80    /// that will send `RpcResponse<RpcSignatureResult>` notifications to the subscriber when the
81    /// transaction status changes.
82    ///
83    /// ## Example WebSocket Request
84    /// ```json
85    /// {
86    ///   "jsonrpc": "2.0",
87    ///   "id": 1,
88    ///   "method": "signatureSubscribe",
89    ///   "params": [
90    ///     "2id3YC2jK9G5Wo2phDx4gJVAew8DcY5NAojnVuao8rkxwPYPe8cSwE5GzhEgJA2y8fVjDEo6iR6ykBvDxrTQrtpb",
91    ///     {
92    ///       "commitment": "finalized",
93    ///       "enableReceivedNotification": false
94    ///     }
95    ///   ]
96    /// }
97    /// ```
98    ///
99    /// ## Example WebSocket Response (Subscription Confirmation)
100    /// ```json
101    /// {
102    ///   "jsonrpc": "2.0",
103    ///   "result": 0,
104    ///   "id": 1
105    /// }
106    /// ```
107    ///
108    /// ## Example WebSocket Notification
109    /// ```json
110    /// {
111    ///   "jsonrpc": "2.0",
112    ///   "method": "signatureNotification",
113    ///   "params": {
114    ///     "result": {
115    ///       "context": {
116    ///         "slot": 5207624
117    ///       },
118    ///       "value": {
119    ///         "err": null
120    ///       }
121    ///     },
122    ///     "subscription": 0
123    ///   }
124    /// }
125    /// ```
126    ///
127    /// ## Notes
128    /// - If the transaction already exists with the desired confirmation status, the subscriber
129    ///   will be notified immediately and the subscription will complete.
130    /// - The subscription automatically terminates after sending the first matching notification.
131    /// - Invalid signature formats will cause the subscription to be rejected with an error.
132    /// - Each subscription runs in its own async task for optimal performance.
133    ///
134    /// ## See Also
135    /// - `signatureUnsubscribe`: Remove an active signature subscription
136    /// - `getSignatureStatuses`: Get current status of multiple signatures
137    #[pubsub(
138        subscription = "signatureNotification",
139        subscribe,
140        name = "signatureSubscribe"
141    )]
142    fn signature_subscribe(
143        &self,
144        meta: Self::Metadata,
145        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
146        signature_str: String,
147        config: Option<RpcSignatureSubscribeConfig>,
148    );
149
150    /// Unsubscribe from signature status notifications.
151    ///
152    /// This method removes an active signature subscription, stopping further notifications
153    /// for the specified subscription ID.
154    ///
155    /// ## Parameters
156    /// - `meta`: Optional WebSocket metadata containing connection information.
157    /// - `subscription`: The subscription ID to remove, as returned by `signatureSubscribe`.
158    ///
159    /// ## Returns
160    /// A `Result<bool>` indicating whether the unsubscription was successful:
161    /// - `Ok(true)` if the subscription was successfully removed
162    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
163    ///
164    /// ## Example WebSocket Request
165    /// ```json
166    /// {
167    ///   "jsonrpc": "2.0",
168    ///   "id": 1,
169    ///   "method": "signatureUnsubscribe",
170    ///   "params": [0]
171    /// }
172    /// ```
173    ///
174    /// ## Example WebSocket Response
175    /// ```json
176    /// {
177    ///   "jsonrpc": "2.0",
178    ///   "result": true,
179    ///   "id": 1
180    /// }
181    /// ```
182    ///
183    /// ## Notes
184    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
185    /// - Successfully unsubscribed connections will no longer receive notifications.
186    /// - This method is thread-safe and can be called concurrently.
187    ///
188    /// ## See Also
189    /// - `signatureSubscribe`: Create a signature status subscription
190    #[pubsub(
191        subscription = "signatureNotification",
192        unsubscribe,
193        name = "signatureUnsubscribe"
194    )]
195    fn signature_unsubscribe(
196        &self,
197        meta: Option<Self::Metadata>,
198        subscription: SubscriptionId,
199    ) -> Result<bool>;
200
201    /// Subscribe to account change notifications via WebSocket.
202    ///
203    /// This method allows clients to subscribe to updates for a specific account.
204    /// The subscriber will receive notifications whenever the account's data, lamports balance,
205    /// ownership, or other properties change.
206    ///
207    /// ## Parameters
208    /// - `meta`: WebSocket metadata containing RPC context and connection information.
209    /// - `subscriber`: The subscription sink for sending account update notifications to the client.
210    /// - `pubkey_str`: The account public key to monitor, as a base-58 encoded string.
211    /// - `config`: Optional configuration specifying commitment level and encoding format for account data.
212    ///
213    /// ## Returns
214    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
215    /// subscription that will send `RpcResponse<UiAccount>` notifications to the subscriber whenever
216    /// the account state changes.
217    ///
218    /// ## Example WebSocket Request
219    /// ```json
220    /// {
221    ///   "jsonrpc": "2.0",
222    ///   "id": 1,
223    ///   "method": "accountSubscribe",
224    ///   "params": [
225    ///     "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12",
226    ///     {
227    ///       "commitment": "finalized",
228    ///       "encoding": "base64"
229    ///     }
230    ///   ]
231    /// }
232    /// ```
233    ///
234    /// ## Example WebSocket Response (Subscription Confirmation)
235    /// ```json
236    /// {
237    ///   "jsonrpc": "2.0",
238    ///   "result": 23784,
239    ///   "id": 1
240    /// }
241    /// ```
242    ///
243    /// ## Example WebSocket Notification
244    /// ```json
245    /// {
246    ///   "jsonrpc": "2.0",
247    ///   "method": "accountNotification",
248    ///   "params": {
249    ///     "result": {
250    ///       "context": {
251    ///         "slot": 5208469
252    ///       },
253    ///       "value": {
254    ///         "data": ["base64EncodedAccountData", "base64"],
255    ///         "executable": false,
256    ///         "lamports": 33594,
257    ///         "owner": "11111111111111111111111111111112",
258    ///         "rentEpoch": 636
259    ///       }
260    ///     },
261    ///     "subscription": 23784
262    ///   }
263    /// }
264    /// ```
265    ///
266    /// ## Notes
267    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
268    /// - Account notifications are sent whenever any aspect of the account changes.
269    /// - The encoding format specified in the config determines how account data is serialized.
270    /// - Invalid public key formats will cause the subscription to be rejected with an error.
271    /// - Each subscription runs in its own async task to ensure optimal performance.
272    ///
273    /// ## See Also
274    /// - `accountUnsubscribe`: Remove an active account subscription
275    /// - `getAccountInfo`: Get current account information
276    #[pubsub(
277        subscription = "accountNotification",
278        subscribe,
279        name = "accountSubscribe"
280    )]
281    fn account_subscribe(
282        &self,
283        meta: Self::Metadata,
284        subscriber: Subscriber<RpcResponse<UiAccount>>,
285        pubkey_str: String,
286        config: Option<RpcAccountSubscribeConfig>,
287    );
288
289    /// Unsubscribe from account change notifications.
290    ///
291    /// This method removes an active account subscription, stopping further notifications
292    /// for the specified subscription ID. The monitoring task will automatically terminate
293    /// when the subscription is removed.
294    ///
295    /// ## Parameters
296    /// - `meta`: Optional WebSocket metadata containing connection information.
297    /// - `subscription`: The subscription ID to remove, as returned by `accountSubscribe`.
298    ///
299    /// ## Returns
300    /// A `Result<bool>` indicating whether the unsubscription was successful:
301    /// - `Ok(true)` if the subscription was successfully removed
302    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
303    ///
304    /// ## Example WebSocket Request
305    /// ```json
306    /// {
307    ///   "jsonrpc": "2.0",
308    ///   "id": 1,
309    ///   "method": "accountUnsubscribe",
310    ///   "params": [23784]
311    /// }
312    /// ```
313    ///
314    /// ## Example WebSocket Response
315    /// ```json
316    /// {
317    ///   "jsonrpc": "2.0",
318    ///   "result": true,
319    ///   "id": 1
320    /// }
321    /// ```
322    ///
323    /// ## Notes
324    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
325    /// - Successfully unsubscribed connections will no longer receive account notifications.
326    /// - The monitoring task automatically detects subscription removal and terminates gracefully.
327    /// - This method is thread-safe and can be called concurrently.
328    ///
329    /// ## See Also
330    /// - `accountSubscribe`: Create an account change subscription
331    #[pubsub(
332        subscription = "accountNotification",
333        unsubscribe,
334        name = "accountUnsubscribe"
335    )]
336    fn account_unsubscribe(
337        &self,
338        meta: Option<Self::Metadata>,
339        subscription: SubscriptionId,
340    ) -> Result<bool>;
341
342    /// Subscribe to slot notifications.
343    ///
344    /// This method allows clients to subscribe to updates for a specific slot.
345    /// The subscriber will receive notifications whenever the slot changes.
346    ///
347    /// ## Parameters
348    /// - `meta`: WebSocket metadata containing RPC context and connection information.
349    /// - `subscriber`: The subscription sink for sending slot update notifications to the client.
350    ///
351    /// ## Returns
352    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
353    /// subscription that will send `SlotInfo` notifications to the subscriber whenever
354    /// the slot changes.
355    ///
356    /// ## Example WebSocket Request
357    /// ```json
358    /// {
359    ///   "jsonrpc": "2.0",
360    ///   "id": 1,
361    ///   "method": "slotSubscribe",
362    ///   "params": [
363    ///     {
364    ///       "commitment": "finalized"
365    ///     }
366    ///   ]
367    /// }
368    /// ```
369    ///
370    /// ## Example WebSocket Response (Subscription Confirmation)
371    /// ```json
372    /// {
373    ///   "jsonrpc": "2.0",
374    ///   "result": 5207624,
375    ///   "id": 1
376    /// }
377    /// ```
378    ///
379    /// ## Example WebSocket Notification
380    /// ```json
381    /// {
382    ///   "jsonrpc": "2.0",
383    ///   "method": "slotNotification",
384    ///   "params": {
385    ///     "result": {
386    ///       "slot": 5207624
387    ///     },
388    ///     "subscription": 5207624
389    ///   }
390    /// }
391    /// ```
392    ///
393    /// ## Notes
394    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
395    /// - Slot notifications are sent whenever the slot changes.
396    /// - The subscription automatically terminates when the slot changes.
397    /// - Each subscription runs in its own async task for optimal performance.
398    ///
399    /// ## See Also
400    /// - `slotUnsubscribe`: Remove an active slot subscription
401    #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
402    fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
403
404    /// Unsubscribe from slot notifications.
405    ///
406    /// This method removes an active slot subscription, stopping further notifications
407    /// for the specified subscription ID.
408    ///
409    /// ## Parameters
410    /// - `meta`: Optional WebSocket metadata containing connection information.
411    /// - `subscription`: The subscription ID to remove, as returned by `slotSubscribe`.
412    ///
413    /// ## Returns
414    /// A `Result<bool>` indicating whether the unsubscription was successful:
415    /// - `Ok(true)` if the subscription was successfully removed
416    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
417    ///
418    /// ## Example WebSocket Request
419    /// ```json
420    /// {
421    ///   "jsonrpc": "2.0",
422    ///   "id": 1,
423    ///   "method": "slotUnsubscribe",
424    ///   "params": [0]
425    /// }
426    /// ```
427    ///
428    /// ## Example WebSocket Response
429    /// ```json
430    /// {
431    ///   "jsonrpc": "2.0",
432    ///   "result": true,
433    ///   "id": 1
434    /// }
435    /// ```
436    ///
437    /// ## Notes
438    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
439    /// - Successfully unsubscribed connections will no longer receive notifications.
440    /// - This method is thread-safe and can be called concurrently.
441    ///
442    /// ## See Also
443    /// - `slotSubscribe`: Create a slot subscription
444    #[pubsub(
445        subscription = "slotNotification",
446        unsubscribe,
447        name = "slotUnsubscribe"
448    )]
449    fn slot_unsubscribe(
450        &self,
451        meta: Option<Self::Metadata>,
452        subscription: SubscriptionId,
453    ) -> Result<bool>;
454
455    /// Subscribe to logs notifications.
456    ///
457    /// This method allows clients to subscribe to transaction log messages
458    /// emitted during transaction execution. It supports filtering by signature,
459    /// account mentions, or all transactions.
460    ///
461    /// ## Parameters
462    /// - `meta`: WebSocket metadata containing RPC context and connection information.
463    /// - `subscriber`: The subscription sink for sending log notifications to the client.
464    /// - `mentions`: Optional filter for the subscription: can be a specific signature, account, or `"all"`.
465    /// - `commitment`: Optional commitment level for filtering logs by block finality.
466    ///
467    /// ## Returns
468    /// This method establishes a continuous WebSocket subscription that streams
469    /// `RpcLogsResponse` notifications as new transactions are processed.
470    ///
471    /// ## Example WebSocket Request
472    /// ```json
473    /// {
474    ///   "jsonrpc": "2.0",
475    ///   "id": 1,
476    ///   "method": "logsSubscribe",
477    ///   "params": [
478    ///     {
479    ///       "mentions": ["11111111111111111111111111111111"]
480    ///     },
481    ///     {
482    ///       "commitment": "finalized"
483    ///     }
484    ///   ]
485    /// }
486    /// ```
487    ///
488    /// ## Example WebSocket Response (Subscription Confirmation)
489    /// ```json
490    /// {
491    ///   "jsonrpc": "2.0",
492    ///   "result": 42,
493    ///   "id": 1
494    /// }
495    /// ```
496    ///
497    /// ## Example WebSocket Notification
498    /// ```json
499    /// {
500    ///   "jsonrpc": "2.0",
501    ///   "method": "logsNotification",
502    ///   "params": {
503    ///     "result": {
504    ///       "signature": "3s6n...",
505    ///       "err": null,
506    ///       "logs": ["Program 111111... invoke [1]", "Program 111111... success"]
507    ///     },
508    ///     "subscription": 42
509    ///   }
510    /// }
511    /// ```
512    ///
513    /// ## Notes
514    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
515    /// - Each log subscription runs independently and supports filtering.
516    /// - Log messages may be truncated depending on cluster configuration.
517    ///
518    /// ## See Also
519    /// - `logsUnsubscribe`: Remove an active logs subscription.
520    #[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")]
521    fn logs_subscribe(
522        &self,
523        meta: Self::Metadata,
524        subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
525        mentions: Option<RpcTransactionLogsFilter>,
526        commitment: Option<CommitmentLevel>,
527    );
528
529    /// Unsubscribe from logs notifications.
530    ///
531    /// This method removes an active logs subscription, stopping further notifications
532    /// for the specified subscription ID.
533    ///
534    /// ## Parameters
535    /// - `meta`: Optional WebSocket metadata containing connection information.
536    /// - `subscription`: The subscription ID to remove, as returned by `logsSubscribe`.
537    ///
538    /// ## Returns
539    /// A `Result<bool>` indicating whether the unsubscription was successful:
540    /// - `Ok(true)` if the subscription was successfully removed.
541    /// - `Err(Error)` with `InvalidParams` if the subscription ID is not recognized.
542    ///
543    /// ## Example WebSocket Request
544    /// ```json
545    /// {
546    ///   "jsonrpc": "2.0",
547    ///   "id": 1,
548    ///   "method": "logsUnsubscribe",
549    ///   "params": [42]
550    /// }
551    /// ```
552    ///
553    /// ## Example WebSocket Response
554    /// ```json
555    /// {
556    ///   "jsonrpc": "2.0",
557    ///   "result": true,
558    ///   "id": 1
559    /// }
560    /// ```
561    ///
562    /// ## Notes
563    /// - Unsubscribing from a non-existent subscription ID returns an error.
564    /// - Successfully unsubscribed clients will no longer receive logs notifications.
565    /// - This method is thread-safe and may be called concurrently.
566    ///
567    /// ## See Also
568    /// - `logsSubscribe`: Create a logs subscription.
569    #[pubsub(
570        subscription = "logsNotification",
571        unsubscribe,
572        name = "logsUnsubscribe"
573    )]
574    fn logs_unsubscribe(
575        &self,
576        meta: Option<Self::Metadata>,
577        subscription: SubscriptionId,
578    ) -> Result<bool>;
579}
580
581/// WebSocket RPC server implementation for Surfpool.
582///
583/// This struct manages WebSocket subscriptions for both signature status updates
584/// and account change notifications in the Surfpool environment. It provides a complete
585/// WebSocket RPC interface that allows clients to subscribe to real-time updates
586/// from the Solana Virtual Machine (SVM) and handles the lifecycle of WebSocket connections.
587///
588/// ## Fields
589/// - `uid`: Atomic counter for generating unique subscription IDs across all subscription types.
590/// - `signature_subscription_map`: Thread-safe HashMap containing active signature subscriptions, mapping subscription IDs to their notification sinks.
591/// - `account_subscription_map`: Thread-safe HashMap containing active account subscriptions, mapping subscription IDs to their notification sinks.
592/// - `slot_subscription_map`: Thread-safe HashMap containing active slot subscriptions, mapping subscription IDs to their notification sinks.
593/// - `tokio_handle`: Runtime handle for spawning asynchronous subscription monitoring tasks.
594///
595/// ## Features
596/// - **Concurrent Subscriptions**: Supports multiple simultaneous subscriptions without blocking.
597/// - **Thread Safety**: All subscription management operations are thread-safe using RwLock.
598/// - **Automatic Cleanup**: Subscriptions are automatically cleaned up when completed or unsubscribed.
599/// - **Efficient Monitoring**: Each subscription runs in its own async task for optimal performance.
600/// - **Real-time Updates**: Provides immediate notifications when monitored conditions are met.
601///
602/// ## Usage
603/// This struct implements the `Rpc` trait and is typically used as part of a larger
604/// WebSocket server infrastructure to provide real-time blockchain data to clients.
605///
606/// ## Notes
607/// - Each subscription is assigned a unique numeric ID for tracking and management.
608/// - The struct maintains separate maps for different subscription types to optimize performance.
609/// - All async operations are managed through the provided Tokio runtime handle.
610///
611/// ## See Also
612/// - `Rpc`: The trait interface this struct implements
613/// - `RpcAccountSubscribeConfig`: Configuration options for account subscriptions
614pub struct SurfpoolWsRpc {
615    pub uid: atomic::AtomicUsize,
616    pub signature_subscription_map:
617        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
618    pub account_subscription_map:
619        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
620    pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
621    pub logs_subscription_map:
622        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcLogsResponse>>>>>,
623    pub tokio_handle: tokio::runtime::Handle,
624}
625
626impl Rpc for SurfpoolWsRpc {
627    type Metadata = Option<SurfpoolWebsocketMeta>;
628
629    /// Implementation of signature subscription for WebSocket clients.
630    ///
631    /// This method handles the complete lifecycle of signature subscriptions:
632    /// 1. Validates the provided signature string format
633    /// 2. Determines the subscription type (received vs commitment-based)
634    /// 3. Checks if the transaction already exists in the desired state
635    /// 4. If found and confirmed, immediately notifies the subscriber
636    /// 5. Otherwise, sets up a continuous monitoring loop
637    /// 6. Spawns an async task to handle ongoing subscription management
638    ///
639    /// # Error Handling
640    /// - Rejects subscription with `InvalidParams` for malformed signatures
641    /// - Handles RPC context retrieval failures
642    /// - Manages subscription cleanup on completion or failure
643    ///
644    /// # Concurrency
645    /// Each subscription runs in its own async task, allowing multiple
646    /// concurrent subscriptions without blocking each other.
647    fn signature_subscribe(
648        &self,
649        meta: Self::Metadata,
650        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
651        signature_str: String,
652        config: Option<RpcSignatureSubscribeConfig>,
653    ) {
654        let signature = match Signature::from_str(&signature_str) {
655            Ok(sig) => sig,
656            Err(_) => {
657                let error = Error {
658                    code: ErrorCode::InvalidParams,
659                    message: "Invalid signature format.".into(),
660                    data: None,
661                };
662                if let Err(e) = subscriber.reject(error.clone()) {
663                    log::error!("Failed to reject subscriber: {:?}", e);
664                }
665                return;
666            }
667        };
668        let config = config.unwrap_or_default();
669        let subscription_type = if config.enable_received_notification.unwrap_or(false) {
670            SignatureSubscriptionType::Received
671        } else {
672            SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
673        };
674
675        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
676        let sub_id = SubscriptionId::Number(id as u64);
677        let sink = match subscriber.assign_id(sub_id.clone()) {
678            Ok(sink) => sink,
679            Err(e) => {
680                log::error!("Failed to assign subscription ID: {:?}", e);
681                return;
682            }
683        };
684        let active = Arc::clone(&self.signature_subscription_map);
685        let meta = meta.clone();
686
687        self.tokio_handle.spawn(async move {
688            if let Ok(mut guard) = active.write() {
689                guard.insert(sub_id.clone(), sink);
690            } else {
691                log::error!("Failed to acquire write lock on signature_subscription_map");
692                return;
693            }
694
695            let SurfnetRpcContext {
696                svm_locker,
697                remote_ctx,
698            } = match meta.get_rpc_context(()) {
699                Ok(res) => res,
700                Err(e) => {
701                    log::error!("Failed to get RPC context: {:?}", e);
702                    if let Ok(mut guard) = active.write() {
703                        if let Some(sink) = guard.remove(&sub_id) {
704                            if let Err(e) = sink.notify(Err(e.into())) {
705                                log::error!("Failed to notify client about RPC context error: {e}");
706                            }
707                        }
708                    }
709                    return;
710                }
711            };
712            // get the signature from the SVM to see if it's already been processed
713            let tx_result = match svm_locker
714                .get_transaction(
715                    &remote_ctx.map(|(r, _)| r),
716                    &signature,
717                    RpcTransactionConfig::default(),
718                )
719                .await
720            {
721                Ok(res) => res,
722                Err(e) => {
723                    if let Ok(mut guard) = active.write() {
724                        if let Some(sink) = guard.remove(&sub_id) {
725                            let _ = sink.notify(Err(e.into()));
726                        }
727                    }
728                    return;
729                }
730            };
731
732            // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
733            // if so, notify the user and complete the subscription
734            // otherwise, subscribe to the transaction updates
735            if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
736                match (&subscription_type, tx.confirmation_status) {
737                    (&SignatureSubscriptionType::Received, _)
738                    | (
739                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
740                        Some(TransactionConfirmationStatus::Processed),
741                    )
742                    | (
743                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
744                        Some(TransactionConfirmationStatus::Confirmed),
745                    )
746                    | (
747                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
748                        Some(TransactionConfirmationStatus::Finalized),
749                    ) => {
750                        if let Ok(mut guard) = active.write() {
751                            if let Some(sink) = guard.remove(&sub_id) {
752                                let _ = sink.notify(Ok(RpcResponse {
753                                    context: RpcResponseContext::new(tx.slot),
754                                    value: RpcSignatureResult::ProcessedSignature(
755                                        ProcessedSignatureResult { err: None },
756                                    ),
757                                }));
758                            }
759                        }
760                        return;
761                    }
762                    _ => {}
763                }
764            }
765
766            // update our surfnet SVM to subscribe to the signature updates
767            let rx =
768                svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
769
770            loop {
771                if let Ok((slot, some_err)) = rx.try_recv() {
772                    if let Ok(mut guard) = active.write() {
773                        if let Some(sink) = guard.remove(&sub_id) {
774                            match subscription_type {
775                                SignatureSubscriptionType::Received => {
776                                    let _ = sink.notify(Ok(RpcResponse {
777                                        context: RpcResponseContext::new(slot),
778                                        value: RpcSignatureResult::ReceivedSignature(
779                                            ReceivedSignatureResult::ReceivedSignature,
780                                        ),
781                                    }));
782                                }
783                                SignatureSubscriptionType::Commitment(_) => {
784                                    let _ = sink.notify(Ok(RpcResponse {
785                                        context: RpcResponseContext::new(slot),
786                                        value: RpcSignatureResult::ProcessedSignature(
787                                            ProcessedSignatureResult { err: some_err },
788                                        ),
789                                    }));
790                                }
791                            }
792                        }
793                    }
794                    return;
795                }
796                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
797            }
798        });
799    }
800
801    /// Implementation of signature unsubscription for WebSocket clients.
802    ///
803    /// This method removes an active signature subscription from the internal
804    /// tracking maps, effectively stopping further notifications for that subscription.
805    ///
806    /// # Implementation Details
807    /// - Attempts to remove the subscription from the active subscriptions map
808    /// - Returns success if the subscription existed and was removed
809    /// - Returns an error if the subscription ID was not found
810    ///
811    /// # Thread Safety
812    /// Uses write locks to ensure thread-safe removal from the subscription map.
813    fn signature_unsubscribe(
814        &self,
815        _meta: Option<Self::Metadata>,
816        subscription: SubscriptionId,
817    ) -> Result<bool> {
818        let removed = if let Ok(mut guard) = self.signature_subscription_map.write() {
819            guard.remove(&subscription)
820        } else {
821            log::error!("Failed to acquire write lock on signature_subscription_map");
822            None
823        };
824        if removed.is_some() {
825            Ok(true)
826        } else {
827            Err(Error {
828                code: ErrorCode::InvalidParams,
829                message: "Invalid subscription.".into(),
830                data: None,
831            })
832        }
833    }
834
835    /// Implementation of account subscription for WebSocket clients.
836    ///
837    /// This method handles the complete lifecycle of account subscriptions:
838    /// 1. Validates the provided public key string format
839    /// 2. Parses the subscription configuration (commitment and encoding)
840    /// 3. Generates a unique subscription ID and assigns it to the subscriber
841    /// 4. Spawns an async task to continuously monitor account changes
842    /// 5. Sends notifications whenever the account state changes
843    ///
844    /// # Monitoring Loop
845    /// The spawned task runs a continuous loop that:
846    /// - Checks if the subscription is still active (not unsubscribed)
847    /// - Polls for account updates from the SVM
848    /// - Sends notifications to the subscriber when changes occur
849    /// - Automatically terminates when the subscription is removed
850    ///
851    /// # Error Handling
852    /// - Rejects subscription with `InvalidParams` for malformed public keys
853    /// - Handles encoding configuration for account data serialization
854    /// - Manages subscription cleanup through the monitoring loop
855    ///
856    /// # Performance
857    /// Uses efficient polling with minimal CPU overhead and automatic
858    /// cleanup when subscriptions are no longer needed.
859    fn account_subscribe(
860        &self,
861        meta: Self::Metadata,
862        subscriber: Subscriber<RpcResponse<UiAccount>>,
863        pubkey_str: String,
864        config: Option<RpcAccountSubscribeConfig>,
865    ) {
866        let pubkey = match Pubkey::from_str(&pubkey_str) {
867            Ok(pk) => pk,
868            Err(_) => {
869                let error = Error {
870                    code: ErrorCode::InvalidParams,
871                    message: "Invalid pubkey format.".into(),
872                    data: None,
873                };
874                if subscriber.reject(error.clone()).is_err() {
875                    log::error!("Failed to reject subscriber for invalid pubkey format.");
876                }
877                return;
878            }
879        };
880
881        let config = config.unwrap_or(RpcAccountSubscribeConfig {
882            commitment: None,
883            encoding: None,
884        });
885
886        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
887        let sub_id = SubscriptionId::Number(id as u64);
888        let sink = match subscriber.assign_id(sub_id.clone()) {
889            Ok(sink) => sink,
890            Err(e) => {
891                log::error!("Failed to assign subscription ID: {:?}", e);
892                return;
893            }
894        };
895
896        let account_active = Arc::clone(&self.account_subscription_map);
897        let meta = meta.clone();
898        let svm_locker = match meta.get_svm_locker() {
899            Ok(locker) => locker,
900            Err(e) => {
901                log::error!("Failed to get SVM locker for account subscription: {e}");
902                if let Err(e) = sink.notify(Err(e.into())) {
903                    log::error!(
904                        "Failed to send error notification to client for SVM locker failure: {e}"
905                    );
906                }
907                return;
908            }
909        };
910        let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
911
912        self.tokio_handle.spawn(async move {
913            if let Ok(mut guard) = account_active.write() {
914                guard.insert(sub_id.clone(), sink);
915            } else {
916                log::error!("Failed to acquire write lock on account_subscription_map");
917                return;
918            }
919
920            // subscribe to account updates
921            let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
922
923            loop {
924                // if the subscription has been removed, break the loop
925                if let Ok(guard) = account_active.read() {
926                    if guard.get(&sub_id).is_none() {
927                        break;
928                    }
929                } else {
930                    log::error!("Failed to acquire read lock on account_subscription_map");
931                    break;
932                }
933
934                if let Ok(ui_account) = rx.try_recv() {
935                    if let Ok(guard) = account_active.read() {
936                        if let Some(sink) = guard.get(&sub_id) {
937                            let _ = sink.notify(Ok(RpcResponse {
938                                context: RpcResponseContext::new(slot),
939                                value: ui_account,
940                            }));
941                        }
942                    }
943                }
944            }
945        });
946    }
947
948    /// Implementation of account unsubscription for WebSocket clients.
949    ///
950    /// This method removes an active account subscription from the internal
951    /// tracking maps, effectively stopping further notifications for that subscription.
952    /// The monitoring loop in the corresponding subscription task will detect this
953    /// removal and automatically terminate.
954    ///
955    /// # Implementation Details
956    /// - Attempts to remove the subscription from the account subscriptions map
957    /// - Returns success if the subscription existed and was removed
958    /// - Returns an error if the subscription ID was not found
959    /// - The removal triggers automatic cleanup of the monitoring task
960    ///
961    /// # Thread Safety
962    /// Uses write locks to ensure thread-safe removal from the subscription map.
963    /// The monitoring task uses read locks to check subscription status, creating
964    /// a clean synchronization pattern.
965    fn account_unsubscribe(
966        &self,
967        _meta: Option<Self::Metadata>,
968        subscription: SubscriptionId,
969    ) -> Result<bool> {
970        let removed = if let Ok(mut guard) = self.account_subscription_map.write() {
971            guard.remove(&subscription)
972        } else {
973            log::error!("Failed to acquire write lock on account_subscription_map");
974            None
975        };
976        if removed.is_some() {
977            Ok(true)
978        } else {
979            Err(Error {
980                code: ErrorCode::InvalidParams,
981                message: "Invalid subscription.".into(),
982                data: None,
983            })
984        }
985    }
986
987    fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
988        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
989        let sub_id = SubscriptionId::Number(id as u64);
990        let sink = match subscriber.assign_id(sub_id.clone()) {
991            Ok(sink) => sink,
992            Err(e) => {
993                log::error!("Failed to assign subscription ID: {:?}", e);
994                return;
995            }
996        };
997
998        let slot_active = Arc::clone(&self.slot_subscription_map);
999        let meta = meta.clone();
1000
1001        let svm_locker = match meta.get_svm_locker() {
1002            Ok(locker) => locker,
1003            Err(e) => {
1004                log::error!("Failed to get SVM locker for slot subscription: {e}");
1005                if let Err(e) = sink.notify(Err(e.into())) {
1006                    log::error!(
1007                        "Failed to send error notification to client for SVM locker failure: {e}"
1008                    );
1009                }
1010                return;
1011            }
1012        };
1013
1014        self.tokio_handle.spawn(async move {
1015            if let Ok(mut guard) = slot_active.write() {
1016                guard.insert(sub_id.clone(), sink);
1017            } else {
1018                log::error!("Failed to acquire write lock on slot_subscription_map");
1019                return;
1020            }
1021
1022            let rx = svm_locker.subscribe_for_slot_updates();
1023
1024            loop {
1025                // if the subscription has been removed, break the loop
1026                if let Ok(guard) = slot_active.read() {
1027                    if guard.get(&sub_id).is_none() {
1028                        break;
1029                    }
1030                } else {
1031                    log::error!("Failed to acquire read lock on slot_subscription_map");
1032                    break;
1033                }
1034
1035                if let Ok(slot_info) = rx.try_recv() {
1036                    if let Ok(guard) = slot_active.read() {
1037                        if let Some(sink) = guard.get(&sub_id) {
1038                            let _ = sink.notify(Ok(slot_info));
1039                        }
1040                    }
1041                }
1042            }
1043        });
1044    }
1045
1046    fn slot_unsubscribe(
1047        &self,
1048        _meta: Option<Self::Metadata>,
1049        subscription: SubscriptionId,
1050    ) -> Result<bool> {
1051        let removed = if let Ok(mut guard) = self.slot_subscription_map.write() {
1052            guard.remove(&subscription)
1053        } else {
1054            log::error!("Failed to acquire write lock on slot_subscription_map");
1055            None
1056        };
1057        if removed.is_some() {
1058            Ok(true)
1059        } else {
1060            Err(Error {
1061                code: ErrorCode::InvalidParams,
1062                message: "Invalid subscription.".into(),
1063                data: None,
1064            })
1065        }
1066    }
1067
1068    fn logs_subscribe(
1069        &self,
1070        meta: Self::Metadata,
1071        subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
1072        mentions: Option<RpcTransactionLogsFilter>,
1073        commitment: Option<CommitmentLevel>,
1074    ) {
1075        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1076        let sub_id = SubscriptionId::Number(id as u64);
1077        let sink = match subscriber.assign_id(sub_id.clone()) {
1078            Ok(sink) => sink,
1079            Err(e) => {
1080                log::error!("Failed to assign subscription ID: {:?}", e);
1081                return;
1082            }
1083        };
1084
1085        let mentions = mentions.unwrap_or(RpcTransactionLogsFilter::All);
1086        let commitment = commitment.unwrap_or(CommitmentLevel::Confirmed);
1087
1088        let logs_active = Arc::clone(&self.logs_subscription_map);
1089        let meta = meta.clone();
1090
1091        let svm_locker = match meta.get_svm_locker() {
1092            Ok(locker) => locker,
1093            Err(e) => {
1094                log::error!("Failed to get SVM locker for slot subscription: {e}");
1095                if let Err(e) = sink.notify(Err(e.into())) {
1096                    log::error!(
1097                        "Failed to send error notification to client for SVM locker failure: {e}"
1098                    );
1099                }
1100                return;
1101            }
1102        };
1103
1104        self.tokio_handle.spawn(async move {
1105            if let Ok(mut guard) = logs_active.write() {
1106                guard.insert(sub_id.clone(), sink);
1107            } else {
1108                log::error!("Failed to acquire write lock on slot_subscription_map");
1109                return;
1110            }
1111
1112            let rx = svm_locker.subscribe_for_logs_updates(&commitment, &mentions);
1113
1114            loop {
1115                // if the subscription has been removed, break the loop
1116                if let Ok(guard) = logs_active.read() {
1117                    if guard.get(&sub_id).is_none() {
1118                        break;
1119                    }
1120                } else {
1121                    log::error!("Failed to acquire read lock on slot_subscription_map");
1122                    break;
1123                }
1124
1125                if let Ok((slot, value)) = rx.try_recv() {
1126                    if let Ok(guard) = logs_active.read() {
1127                        if let Some(sink) = guard.get(&sub_id) {
1128                            let _ = sink.notify(Ok(RpcResponse {
1129                                context: RpcResponseContext::new(slot),
1130                                value,
1131                            }));
1132                        }
1133                    }
1134                }
1135            }
1136        });
1137    }
1138
1139    fn logs_unsubscribe(
1140        &self,
1141        _meta: Option<Self::Metadata>,
1142        subscription: SubscriptionId,
1143    ) -> Result<bool> {
1144        let removed = if let Ok(mut guard) = self.logs_subscription_map.write() {
1145            guard.remove(&subscription)
1146        } else {
1147            log::error!("Failed to acquire write lock on logs_subscription_map");
1148            None
1149        };
1150        if removed.is_some() {
1151            Ok(true)
1152        } else {
1153            Err(Error {
1154                code: ErrorCode::InvalidParams,
1155                message: "Invalid subscription.".into(),
1156                data: None,
1157            })
1158        }
1159    }
1160}