surfpool_core/rpc/ws.rs
1use std::{
2 collections::HashMap,
3 str::FromStr,
4 sync::{atomic, Arc, RwLock},
5};
6
7use jsonrpc_core::{Error, ErrorCode, Result};
8use jsonrpc_derive::rpc;
9use jsonrpc_pubsub::{
10 typed::{Sink, Subscriber},
11 SubscriptionId,
12};
13use solana_account_decoder::{UiAccount, UiAccountEncoding};
14use solana_client::{
15 rpc_config::RpcSignatureSubscribeConfig,
16 rpc_response::{
17 ProcessedSignatureResult, ReceivedSignatureResult, RpcResponseContext, RpcSignatureResult,
18 },
19};
20use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
21use solana_pubkey::Pubkey;
22use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
23use solana_signature::Signature;
24use solana_transaction_status::TransactionConfirmationStatus;
25
26use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
27use crate::surfnet::{locker::SvmAccessContext, GetTransactionResult, SignatureSubscriptionType};
28
29/// Configuration for account subscription requests.
30///
31/// This struct defines the parameters that clients can specify when subscribing
32/// to account change notifications through WebSocket connections. It allows customization
33/// of both the commitment level for updates and the encoding format for account data.
34///
35/// ## Fields
36/// - `commitment`: Optional commitment configuration specifying when to send notifications
37/// (processed, confirmed, or finalized). Defaults to the node's default commitment level.
38/// - `encoding`: Optional encoding format for account data serialization (base58, base64, jsonParsed, etc.).
39/// Defaults to base58 encoding if not specified.
40///
41/// ## Usage
42/// Clients can provide this configuration to customize their subscription behavior:
43/// - Set commitment level to control notification timing based on confirmation status
44/// - Set encoding to specify the preferred format for receiving account data
45///
46/// ## Example Usage
47/// ```json
48/// {
49/// "commitment": "confirmed",
50/// "encoding": "base64"
51/// }
52/// ```
53#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct RpcAccountSubscribeConfig {
56 #[serde(flatten)]
57 pub commitment: Option<CommitmentConfig>,
58 pub encoding: Option<UiAccountEncoding>,
59}
60
61#[rpc]
62pub trait Rpc {
63 type Metadata;
64
65 /// Subscribe to signature status notifications via WebSocket.
66 ///
67 /// This method allows clients to subscribe to status updates for a specific transaction signature.
68 /// The subscriber will receive notifications when the transaction reaches the desired confirmation level
69 /// or when it's initially received by the network (if configured).
70 ///
71 /// ## Parameters
72 /// - `meta`: WebSocket metadata containing RPC context and connection information.
73 /// - `subscriber`: The subscription sink for sending signature status notifications to the client.
74 /// - `signature_str`: The transaction signature to monitor, as a base-58 encoded string.
75 /// - `config`: Optional configuration specifying commitment level and notification preferences.
76 ///
77 /// ## Returns
78 /// This method does not return a value directly. Instead, it establishes a WebSocket subscription
79 /// that will send `RpcResponse<RpcSignatureResult>` notifications to the subscriber when the
80 /// transaction status changes.
81 ///
82 /// ## Example WebSocket Request
83 /// ```json
84 /// {
85 /// "jsonrpc": "2.0",
86 /// "id": 1,
87 /// "method": "signatureSubscribe",
88 /// "params": [
89 /// "2id3YC2jK9G5Wo2phDx4gJVAew8DcY5NAojnVuao8rkxwPYPe8cSwE5GzhEgJA2y8fVjDEo6iR6ykBvDxrTQrtpb",
90 /// {
91 /// "commitment": "finalized",
92 /// "enableReceivedNotification": false
93 /// }
94 /// ]
95 /// }
96 /// ```
97 ///
98 /// ## Example WebSocket Response (Subscription Confirmation)
99 /// ```json
100 /// {
101 /// "jsonrpc": "2.0",
102 /// "result": 0,
103 /// "id": 1
104 /// }
105 /// ```
106 ///
107 /// ## Example WebSocket Notification
108 /// ```json
109 /// {
110 /// "jsonrpc": "2.0",
111 /// "method": "signatureNotification",
112 /// "params": {
113 /// "result": {
114 /// "context": {
115 /// "slot": 5207624
116 /// },
117 /// "value": {
118 /// "err": null
119 /// }
120 /// },
121 /// "subscription": 0
122 /// }
123 /// }
124 /// ```
125 ///
126 /// ## Notes
127 /// - If the transaction already exists with the desired confirmation status, the subscriber
128 /// will be notified immediately and the subscription will complete.
129 /// - The subscription automatically terminates after sending the first matching notification.
130 /// - Invalid signature formats will cause the subscription to be rejected with an error.
131 /// - Each subscription runs in its own async task for optimal performance.
132 ///
133 /// ## See Also
134 /// - `signatureUnsubscribe`: Remove an active signature subscription
135 /// - `getSignatureStatuses`: Get current status of multiple signatures
136 #[pubsub(
137 subscription = "signatureNotification",
138 subscribe,
139 name = "signatureSubscribe"
140 )]
141 fn signature_subscribe(
142 &self,
143 meta: Self::Metadata,
144 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
145 signature_str: String,
146 config: Option<RpcSignatureSubscribeConfig>,
147 );
148
149 /// Unsubscribe from signature status notifications.
150 ///
151 /// This method removes an active signature subscription, stopping further notifications
152 /// for the specified subscription ID.
153 ///
154 /// ## Parameters
155 /// - `meta`: Optional WebSocket metadata containing connection information.
156 /// - `subscription`: The subscription ID to remove, as returned by `signatureSubscribe`.
157 ///
158 /// ## Returns
159 /// A `Result<bool>` indicating whether the unsubscription was successful:
160 /// - `Ok(true)` if the subscription was successfully removed
161 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
162 ///
163 /// ## Example WebSocket Request
164 /// ```json
165 /// {
166 /// "jsonrpc": "2.0",
167 /// "id": 1,
168 /// "method": "signatureUnsubscribe",
169 /// "params": [0]
170 /// }
171 /// ```
172 ///
173 /// ## Example WebSocket Response
174 /// ```json
175 /// {
176 /// "jsonrpc": "2.0",
177 /// "result": true,
178 /// "id": 1
179 /// }
180 /// ```
181 ///
182 /// ## Notes
183 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
184 /// - Successfully unsubscribed connections will no longer receive notifications.
185 /// - This method is thread-safe and can be called concurrently.
186 ///
187 /// ## See Also
188 /// - `signatureSubscribe`: Create a signature status subscription
189 #[pubsub(
190 subscription = "signatureNotification",
191 unsubscribe,
192 name = "signatureUnsubscribe"
193 )]
194 fn signature_unsubscribe(
195 &self,
196 meta: Option<Self::Metadata>,
197 subscription: SubscriptionId,
198 ) -> Result<bool>;
199
200 /// Subscribe to account change notifications via WebSocket.
201 ///
202 /// This method allows clients to subscribe to updates for a specific account.
203 /// The subscriber will receive notifications whenever the account's data, lamports balance,
204 /// ownership, or other properties change.
205 ///
206 /// ## Parameters
207 /// - `meta`: WebSocket metadata containing RPC context and connection information.
208 /// - `subscriber`: The subscription sink for sending account update notifications to the client.
209 /// - `pubkey_str`: The account public key to monitor, as a base-58 encoded string.
210 /// - `config`: Optional configuration specifying commitment level and encoding format for account data.
211 ///
212 /// ## Returns
213 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
214 /// subscription that will send `RpcResponse<UiAccount>` notifications to the subscriber whenever
215 /// the account state changes.
216 ///
217 /// ## Example WebSocket Request
218 /// ```json
219 /// {
220 /// "jsonrpc": "2.0",
221 /// "id": 1,
222 /// "method": "accountSubscribe",
223 /// "params": [
224 /// "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12",
225 /// {
226 /// "commitment": "finalized",
227 /// "encoding": "base64"
228 /// }
229 /// ]
230 /// }
231 /// ```
232 ///
233 /// ## Example WebSocket Response (Subscription Confirmation)
234 /// ```json
235 /// {
236 /// "jsonrpc": "2.0",
237 /// "result": 23784,
238 /// "id": 1
239 /// }
240 /// ```
241 ///
242 /// ## Example WebSocket Notification
243 /// ```json
244 /// {
245 /// "jsonrpc": "2.0",
246 /// "method": "accountNotification",
247 /// "params": {
248 /// "result": {
249 /// "context": {
250 /// "slot": 5208469
251 /// },
252 /// "value": {
253 /// "data": ["base64EncodedAccountData", "base64"],
254 /// "executable": false,
255 /// "lamports": 33594,
256 /// "owner": "11111111111111111111111111111112",
257 /// "rentEpoch": 636
258 /// }
259 /// },
260 /// "subscription": 23784
261 /// }
262 /// }
263 /// ```
264 ///
265 /// ## Notes
266 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
267 /// - Account notifications are sent whenever any aspect of the account changes.
268 /// - The encoding format specified in the config determines how account data is serialized.
269 /// - Invalid public key formats will cause the subscription to be rejected with an error.
270 /// - Each subscription runs in its own async task to ensure optimal performance.
271 ///
272 /// ## See Also
273 /// - `accountUnsubscribe`: Remove an active account subscription
274 /// - `getAccountInfo`: Get current account information
275 #[pubsub(
276 subscription = "accountNotification",
277 subscribe,
278 name = "accountSubscribe"
279 )]
280 fn account_subscribe(
281 &self,
282 meta: Self::Metadata,
283 subscriber: Subscriber<RpcResponse<UiAccount>>,
284 pubkey_str: String,
285 config: Option<RpcAccountSubscribeConfig>,
286 );
287
288 /// Unsubscribe from account change notifications.
289 ///
290 /// This method removes an active account subscription, stopping further notifications
291 /// for the specified subscription ID. The monitoring task will automatically terminate
292 /// when the subscription is removed.
293 ///
294 /// ## Parameters
295 /// - `meta`: Optional WebSocket metadata containing connection information.
296 /// - `subscription`: The subscription ID to remove, as returned by `accountSubscribe`.
297 ///
298 /// ## Returns
299 /// A `Result<bool>` indicating whether the unsubscription was successful:
300 /// - `Ok(true)` if the subscription was successfully removed
301 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
302 ///
303 /// ## Example WebSocket Request
304 /// ```json
305 /// {
306 /// "jsonrpc": "2.0",
307 /// "id": 1,
308 /// "method": "accountUnsubscribe",
309 /// "params": [23784]
310 /// }
311 /// ```
312 ///
313 /// ## Example WebSocket Response
314 /// ```json
315 /// {
316 /// "jsonrpc": "2.0",
317 /// "result": true,
318 /// "id": 1
319 /// }
320 /// ```
321 ///
322 /// ## Notes
323 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
324 /// - Successfully unsubscribed connections will no longer receive account notifications.
325 /// - The monitoring task automatically detects subscription removal and terminates gracefully.
326 /// - This method is thread-safe and can be called concurrently.
327 ///
328 /// ## See Also
329 /// - `accountSubscribe`: Create an account change subscription
330 #[pubsub(
331 subscription = "accountNotification",
332 unsubscribe,
333 name = "accountUnsubscribe"
334 )]
335 fn account_unsubscribe(
336 &self,
337 meta: Option<Self::Metadata>,
338 subscription: SubscriptionId,
339 ) -> Result<bool>;
340
341 /// Subscribe to slot notifications.
342 ///
343 /// This method allows clients to subscribe to updates for a specific slot.
344 /// The subscriber will receive notifications whenever the slot changes.
345 ///
346 /// ## Parameters
347 /// - `meta`: WebSocket metadata containing RPC context and connection information.
348 /// - `subscriber`: The subscription sink for sending slot update notifications to the client.
349 ///
350 /// ## Returns
351 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
352 /// subscription that will send `SlotInfo` notifications to the subscriber whenever
353 /// the slot changes.
354 ///
355 /// ## Example WebSocket Request
356 /// ```json
357 /// {
358 /// "jsonrpc": "2.0",
359 /// "id": 1,
360 /// "method": "slotSubscribe",
361 /// "params": [
362 /// {
363 /// "commitment": "finalized"
364 /// }
365 /// ]
366 /// }
367 /// ```
368 ///
369 /// ## Example WebSocket Response (Subscription Confirmation)
370 /// ```json
371 /// {
372 /// "jsonrpc": "2.0",
373 /// "result": 5207624,
374 /// "id": 1
375 /// }
376 /// ```
377 ///
378 /// ## Example WebSocket Notification
379 /// ```json
380 /// {
381 /// "jsonrpc": "2.0",
382 /// "method": "slotNotification",
383 /// "params": {
384 /// "result": {
385 /// "slot": 5207624
386 /// },
387 /// "subscription": 5207624
388 /// }
389 /// }
390 /// ```
391 ///
392 /// ## Notes
393 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
394 /// - Slot notifications are sent whenever the slot changes.
395 /// - The subscription automatically terminates when the slot changes.
396 /// - Each subscription runs in its own async task for optimal performance.
397 ///
398 /// ## See Also
399 /// - `slotUnsubscribe`: Remove an active slot subscription
400 #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
401 fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
402
403 /// Unsubscribe from slot notifications.
404 ///
405 /// This method removes an active slot subscription, stopping further notifications
406 /// for the specified subscription ID.
407 ///
408 /// ## Parameters
409 /// - `meta`: Optional WebSocket metadata containing connection information.
410 /// - `subscription`: The subscription ID to remove, as returned by `slotSubscribe`.
411 ///
412 /// ## Returns
413 /// A `Result<bool>` indicating whether the unsubscription was successful:
414 /// - `Ok(true)` if the subscription was successfully removed
415 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
416 ///
417 /// ## Example WebSocket Request
418 /// ```json
419 /// {
420 /// "jsonrpc": "2.0",
421 /// "id": 1,
422 /// "method": "slotUnsubscribe",
423 /// "params": [0]
424 /// }
425 /// ```
426 ///
427 /// ## Example WebSocket Response
428 /// ```json
429 /// {
430 /// "jsonrpc": "2.0",
431 /// "result": true,
432 /// "id": 1
433 /// }
434 /// ```
435 ///
436 /// ## Notes
437 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
438 /// - Successfully unsubscribed connections will no longer receive notifications.
439 /// - This method is thread-safe and can be called concurrently.
440 ///
441 /// ## See Also
442 /// - `slotSubscribe`: Create a slot subscription
443 #[pubsub(
444 subscription = "slotNotification",
445 unsubscribe,
446 name = "slotUnsubscribe"
447 )]
448 fn slot_unsubscribe(
449 &self,
450 meta: Option<Self::Metadata>,
451 subscription: SubscriptionId,
452 ) -> Result<bool>;
453}
454
455/// WebSocket RPC server implementation for Surfpool.
456///
457/// This struct manages WebSocket subscriptions for both signature status updates
458/// and account change notifications in the Surfpool environment. It provides a complete
459/// WebSocket RPC interface that allows clients to subscribe to real-time updates
460/// from the Solana Virtual Machine (SVM) and handles the lifecycle of WebSocket connections.
461///
462/// ## Fields
463/// - `uid`: Atomic counter for generating unique subscription IDs across all subscription types.
464/// - `signature_subscription_map`: Thread-safe HashMap containing active signature subscriptions, mapping subscription IDs to their notification sinks.
465/// - `account_subscription_map`: Thread-safe HashMap containing active account subscriptions, mapping subscription IDs to their notification sinks.
466/// - `slot_subscription_map`: Thread-safe HashMap containing active slot subscriptions, mapping subscription IDs to their notification sinks.
467/// - `tokio_handle`: Runtime handle for spawning asynchronous subscription monitoring tasks.
468///
469/// ## Features
470/// - **Concurrent Subscriptions**: Supports multiple simultaneous subscriptions without blocking.
471/// - **Thread Safety**: All subscription management operations are thread-safe using RwLock.
472/// - **Automatic Cleanup**: Subscriptions are automatically cleaned up when completed or unsubscribed.
473/// - **Efficient Monitoring**: Each subscription runs in its own async task for optimal performance.
474/// - **Real-time Updates**: Provides immediate notifications when monitored conditions are met.
475///
476/// ## Usage
477/// This struct implements the `Rpc` trait and is typically used as part of a larger
478/// WebSocket server infrastructure to provide real-time blockchain data to clients.
479///
480/// ## Notes
481/// - Each subscription is assigned a unique numeric ID for tracking and management.
482/// - The struct maintains separate maps for different subscription types to optimize performance.
483/// - All async operations are managed through the provided Tokio runtime handle.
484///
485/// ## See Also
486/// - `Rpc`: The trait interface this struct implements
487/// - `RpcAccountSubscribeConfig`: Configuration options for account subscriptions
488pub struct SurfpoolWsRpc {
489 pub uid: atomic::AtomicUsize,
490 pub signature_subscription_map:
491 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
492 pub account_subscription_map:
493 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
494 pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
495 pub tokio_handle: tokio::runtime::Handle,
496}
497
498impl Rpc for SurfpoolWsRpc {
499 type Metadata = Option<SurfpoolWebsocketMeta>;
500
501 /// Implementation of signature subscription for WebSocket clients.
502 ///
503 /// This method handles the complete lifecycle of signature subscriptions:
504 /// 1. Validates the provided signature string format
505 /// 2. Determines the subscription type (received vs commitment-based)
506 /// 3. Checks if the transaction already exists in the desired state
507 /// 4. If found and confirmed, immediately notifies the subscriber
508 /// 5. Otherwise, sets up a continuous monitoring loop
509 /// 6. Spawns an async task to handle ongoing subscription management
510 ///
511 /// # Error Handling
512 /// - Rejects subscription with `InvalidParams` for malformed signatures
513 /// - Handles RPC context retrieval failures
514 /// - Manages subscription cleanup on completion or failure
515 ///
516 /// # Concurrency
517 /// Each subscription runs in its own async task, allowing multiple
518 /// concurrent subscriptions without blocking each other.
519 fn signature_subscribe(
520 &self,
521 meta: Self::Metadata,
522 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
523 signature_str: String,
524 config: Option<RpcSignatureSubscribeConfig>,
525 ) {
526 let signature = match Signature::from_str(&signature_str) {
527 Ok(sig) => sig,
528 Err(_) => {
529 let error = Error {
530 code: ErrorCode::InvalidParams,
531 message: "Invalid signature format.".into(),
532 data: None,
533 };
534 if let Err(e) = subscriber.reject(error.clone()) {
535 log::error!("Failed to reject subscriber: {:?}", e);
536 }
537 return;
538 }
539 };
540 let config = config.unwrap_or_default();
541 let subscription_type = if config.enable_received_notification.unwrap_or(false) {
542 SignatureSubscriptionType::Received
543 } else {
544 SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
545 };
546
547 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
548 let sub_id = SubscriptionId::Number(id as u64);
549 let sink = match subscriber.assign_id(sub_id.clone()) {
550 Ok(sink) => sink,
551 Err(e) => {
552 log::error!("Failed to assign subscription ID: {:?}", e);
553 return;
554 }
555 };
556 let active = Arc::clone(&self.signature_subscription_map);
557 let meta = meta.clone();
558
559 self.tokio_handle.spawn(async move {
560 if let Ok(mut guard) = active.write() {
561 guard.insert(sub_id.clone(), sink);
562 } else {
563 log::error!("Failed to acquire write lock on signature_subscription_map");
564 return;
565 }
566
567 let SurfnetRpcContext {
568 svm_locker,
569 remote_ctx,
570 } = match meta.get_rpc_context(None) {
571 Ok(res) => res,
572 Err(e) => {
573 log::error!("Failed to get RPC context: {:?}", e);
574 if let Ok(mut guard) = active.write() {
575 if let Some(sink) = guard.remove(&sub_id) {
576 if let Err(e) = sink.notify(Err(e.into())) {
577 log::error!("Failed to notify client about RPC context error: {e}");
578 }
579 }
580 }
581 return;
582 }
583 };
584 // get the signature from the SVM to see if it's already been processed
585 let SvmAccessContext {
586 inner: tx_result, ..
587 } = svm_locker.get_transaction(&remote_ctx, &signature).await;
588
589 // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
590 // if so, notify the user and complete the subscription
591 // otherwise, subscribe to the transaction updates
592 if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
593 match (&subscription_type, tx.confirmation_status) {
594 (&SignatureSubscriptionType::Received, _)
595 | (
596 &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
597 Some(TransactionConfirmationStatus::Processed),
598 )
599 | (
600 &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
601 Some(TransactionConfirmationStatus::Confirmed),
602 )
603 | (
604 &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
605 Some(TransactionConfirmationStatus::Finalized),
606 ) => {
607 if let Ok(mut guard) = active.write() {
608 if let Some(sink) = guard.remove(&sub_id) {
609 let _ = sink.notify(Ok(RpcResponse {
610 context: RpcResponseContext::new(tx.slot),
611 value: RpcSignatureResult::ProcessedSignature(
612 ProcessedSignatureResult { err: None },
613 ),
614 }));
615 }
616 }
617 return;
618 }
619 _ => {}
620 }
621 }
622
623 // update our surfnet SVM to subscribe to the signature updates
624 let rx =
625 svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
626
627 loop {
628 if let Ok((slot, some_err)) = rx.try_recv() {
629 if let Ok(mut guard) = active.write() {
630 if let Some(sink) = guard.remove(&sub_id) {
631 match subscription_type {
632 SignatureSubscriptionType::Received => {
633 let _ = sink.notify(Ok(RpcResponse {
634 context: RpcResponseContext::new(slot),
635 value: RpcSignatureResult::ReceivedSignature(
636 ReceivedSignatureResult::ReceivedSignature,
637 ),
638 }));
639 }
640 SignatureSubscriptionType::Commitment(_) => {
641 let _ = sink.notify(Ok(RpcResponse {
642 context: RpcResponseContext::new(slot),
643 value: RpcSignatureResult::ProcessedSignature(
644 ProcessedSignatureResult { err: some_err },
645 ),
646 }));
647 }
648 }
649 }
650 }
651 return;
652 }
653 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
654 }
655 });
656 }
657
658 /// Implementation of signature unsubscription for WebSocket clients.
659 ///
660 /// This method removes an active signature subscription from the internal
661 /// tracking maps, effectively stopping further notifications for that subscription.
662 ///
663 /// # Implementation Details
664 /// - Attempts to remove the subscription from the active subscriptions map
665 /// - Returns success if the subscription existed and was removed
666 /// - Returns an error if the subscription ID was not found
667 ///
668 /// # Thread Safety
669 /// Uses write locks to ensure thread-safe removal from the subscription map.
670 fn signature_unsubscribe(
671 &self,
672 _meta: Option<Self::Metadata>,
673 subscription: SubscriptionId,
674 ) -> Result<bool> {
675 let removed = if let Ok(mut guard) = self.signature_subscription_map.write() {
676 guard.remove(&subscription)
677 } else {
678 log::error!("Failed to acquire write lock on signature_subscription_map");
679 None
680 };
681 if removed.is_some() {
682 Ok(true)
683 } else {
684 Err(Error {
685 code: ErrorCode::InvalidParams,
686 message: "Invalid subscription.".into(),
687 data: None,
688 })
689 }
690 }
691
692 /// Implementation of account subscription for WebSocket clients.
693 ///
694 /// This method handles the complete lifecycle of account subscriptions:
695 /// 1. Validates the provided public key string format
696 /// 2. Parses the subscription configuration (commitment and encoding)
697 /// 3. Generates a unique subscription ID and assigns it to the subscriber
698 /// 4. Spawns an async task to continuously monitor account changes
699 /// 5. Sends notifications whenever the account state changes
700 ///
701 /// # Monitoring Loop
702 /// The spawned task runs a continuous loop that:
703 /// - Checks if the subscription is still active (not unsubscribed)
704 /// - Polls for account updates from the SVM
705 /// - Sends notifications to the subscriber when changes occur
706 /// - Automatically terminates when the subscription is removed
707 ///
708 /// # Error Handling
709 /// - Rejects subscription with `InvalidParams` for malformed public keys
710 /// - Handles encoding configuration for account data serialization
711 /// - Manages subscription cleanup through the monitoring loop
712 ///
713 /// # Performance
714 /// Uses efficient polling with minimal CPU overhead and automatic
715 /// cleanup when subscriptions are no longer needed.
716 fn account_subscribe(
717 &self,
718 meta: Self::Metadata,
719 subscriber: Subscriber<RpcResponse<UiAccount>>,
720 pubkey_str: String,
721 config: Option<RpcAccountSubscribeConfig>,
722 ) {
723 let pubkey = match Pubkey::from_str(&pubkey_str) {
724 Ok(pk) => pk,
725 Err(_) => {
726 let error = Error {
727 code: ErrorCode::InvalidParams,
728 message: "Invalid pubkey format.".into(),
729 data: None,
730 };
731 if subscriber.reject(error.clone()).is_err() {
732 log::error!("Failed to reject subscriber for invalid pubkey format.");
733 }
734 return;
735 }
736 };
737
738 let config = config.unwrap_or(RpcAccountSubscribeConfig {
739 commitment: None,
740 encoding: None,
741 });
742
743 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
744 let sub_id = SubscriptionId::Number(id as u64);
745 let sink = match subscriber.assign_id(sub_id.clone()) {
746 Ok(sink) => sink,
747 Err(e) => {
748 log::error!("Failed to assign subscription ID: {:?}", e);
749 return;
750 }
751 };
752
753 let account_active = Arc::clone(&self.account_subscription_map);
754 let meta = meta.clone();
755 let svm_locker = match meta.get_svm_locker() {
756 Ok(locker) => locker,
757 Err(e) => {
758 log::error!("Failed to get SVM locker for account subscription: {e}");
759 if let Err(e) = sink.notify(Err(e.into())) {
760 log::error!(
761 "Failed to send error notification to client for SVM locker failure: {e}"
762 );
763 }
764 return;
765 }
766 };
767 let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
768
769 self.tokio_handle.spawn(async move {
770 if let Ok(mut guard) = account_active.write() {
771 guard.insert(sub_id.clone(), sink);
772 } else {
773 log::error!("Failed to acquire write lock on account_subscription_map");
774 return;
775 }
776
777 // subscribe to account updates
778 let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
779
780 loop {
781 // if the subscription has been removed, break the loop
782 if let Ok(guard) = account_active.read() {
783 if guard.get(&sub_id).is_none() {
784 break;
785 }
786 } else {
787 log::error!("Failed to acquire read lock on account_subscription_map");
788 break;
789 }
790
791 if let Ok(ui_account) = rx.try_recv() {
792 if let Ok(guard) = account_active.read() {
793 if let Some(sink) = guard.get(&sub_id) {
794 let _ = sink.notify(Ok(RpcResponse {
795 context: RpcResponseContext::new(slot),
796 value: ui_account,
797 }));
798 }
799 }
800 }
801 }
802 });
803 }
804
805 /// Implementation of account unsubscription for WebSocket clients.
806 ///
807 /// This method removes an active account subscription from the internal
808 /// tracking maps, effectively stopping further notifications for that subscription.
809 /// The monitoring loop in the corresponding subscription task will detect this
810 /// removal and automatically terminate.
811 ///
812 /// # Implementation Details
813 /// - Attempts to remove the subscription from the account subscriptions map
814 /// - Returns success if the subscription existed and was removed
815 /// - Returns an error if the subscription ID was not found
816 /// - The removal triggers automatic cleanup of the monitoring task
817 ///
818 /// # Thread Safety
819 /// Uses write locks to ensure thread-safe removal from the subscription map.
820 /// The monitoring task uses read locks to check subscription status, creating
821 /// a clean synchronization pattern.
822 fn account_unsubscribe(
823 &self,
824 _meta: Option<Self::Metadata>,
825 subscription: SubscriptionId,
826 ) -> Result<bool> {
827 let removed = if let Ok(mut guard) = self.account_subscription_map.write() {
828 guard.remove(&subscription)
829 } else {
830 log::error!("Failed to acquire write lock on account_subscription_map");
831 None
832 };
833 if removed.is_some() {
834 Ok(true)
835 } else {
836 Err(Error {
837 code: ErrorCode::InvalidParams,
838 message: "Invalid subscription.".into(),
839 data: None,
840 })
841 }
842 }
843
844 fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
845 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
846 let sub_id = SubscriptionId::Number(id as u64);
847 let sink = match subscriber.assign_id(sub_id.clone()) {
848 Ok(sink) => sink,
849 Err(e) => {
850 log::error!("Failed to assign subscription ID: {:?}", e);
851 return;
852 }
853 };
854
855 let slot_active = Arc::clone(&self.slot_subscription_map);
856 let meta = meta.clone();
857
858 let svm_locker = match meta.get_svm_locker() {
859 Ok(locker) => locker,
860 Err(e) => {
861 log::error!("Failed to get SVM locker for slot subscription: {e}");
862 if let Err(e) = sink.notify(Err(e.into())) {
863 log::error!(
864 "Failed to send error notification to client for SVM locker failure: {e}"
865 );
866 }
867 return;
868 }
869 };
870
871 self.tokio_handle.spawn(async move {
872 if let Ok(mut guard) = slot_active.write() {
873 guard.insert(sub_id.clone(), sink);
874 } else {
875 log::error!("Failed to acquire write lock on slot_subscription_map");
876 return;
877 }
878
879 let rx = svm_locker.subscribe_for_slot_updates();
880
881 loop {
882 // if the subscription has been removed, break the loop
883 if let Ok(guard) = slot_active.read() {
884 if guard.get(&sub_id).is_none() {
885 break;
886 }
887 } else {
888 log::error!("Failed to acquire read lock on slot_subscription_map");
889 break;
890 }
891
892 if let Ok(slot_info) = rx.try_recv() {
893 if let Ok(guard) = slot_active.read() {
894 if let Some(sink) = guard.get(&sub_id) {
895 let _ = sink.notify(Ok(slot_info));
896 }
897 }
898 }
899 }
900 });
901 }
902
903 fn slot_unsubscribe(
904 &self,
905 _meta: Option<Self::Metadata>,
906 subscription: SubscriptionId,
907 ) -> Result<bool> {
908 let removed = if let Ok(mut guard) = self.slot_subscription_map.write() {
909 guard.remove(&subscription)
910 } else {
911 log::error!("Failed to acquire write lock on slot_subscription_map");
912 None
913 };
914 if removed.is_some() {
915 Ok(true)
916 } else {
917 Err(Error {
918 code: ErrorCode::InvalidParams,
919 message: "Invalid subscription.".into(),
920 data: None,
921 })
922 }
923 }
924}