surfpool_core/rpc/ws.rs
1use std::{
2 collections::HashMap,
3 str::FromStr,
4 sync::{Arc, RwLock, atomic},
5};
6
7use jsonrpc_core::{Error, ErrorCode, Result};
8use jsonrpc_derive::rpc;
9use jsonrpc_pubsub::{
10 SubscriptionId,
11 typed::{Sink, Subscriber},
12};
13use solana_account_decoder::{UiAccount, UiAccountEncoding};
14use solana_client::{
15 rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig, RpcTransactionLogsFilter},
16 rpc_response::{
17 ProcessedSignatureResult, ReceivedSignatureResult, RpcLogsResponse, RpcResponseContext,
18 RpcSignatureResult,
19 },
20};
21use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
22use solana_pubkey::Pubkey;
23use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
24use solana_signature::Signature;
25use solana_transaction_status::TransactionConfirmationStatus;
26
27use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
28use crate::surfnet::{GetTransactionResult, SignatureSubscriptionType};
29
30/// Configuration for account subscription requests.
31///
32/// This struct defines the parameters that clients can specify when subscribing
33/// to account change notifications through WebSocket connections. It allows customization
34/// of both the commitment level for updates and the encoding format for account data.
35///
36/// ## Fields
37/// - `commitment`: Optional commitment configuration specifying when to send notifications
38/// (processed, confirmed, or finalized). Defaults to the node's default commitment level.
39/// - `encoding`: Optional encoding format for account data serialization (base58, base64, jsonParsed, etc.).
40/// Defaults to base58 encoding if not specified.
41///
42/// ## Usage
43/// Clients can provide this configuration to customize their subscription behavior:
44/// - Set commitment level to control notification timing based on confirmation status
45/// - Set encoding to specify the preferred format for receiving account data
46///
47/// ## Example Usage
48/// ```json
49/// {
50/// "commitment": "confirmed",
51/// "encoding": "base64"
52/// }
53/// ```
54#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct RpcAccountSubscribeConfig {
57 #[serde(flatten)]
58 pub commitment: Option<CommitmentConfig>,
59 pub encoding: Option<UiAccountEncoding>,
60}
61
62#[rpc]
63pub trait Rpc {
64 type Metadata;
65
66 /// Subscribe to signature status notifications via WebSocket.
67 ///
68 /// This method allows clients to subscribe to status updates for a specific transaction signature.
69 /// The subscriber will receive notifications when the transaction reaches the desired confirmation level
70 /// or when it's initially received by the network (if configured).
71 ///
72 /// ## Parameters
73 /// - `meta`: WebSocket metadata containing RPC context and connection information.
74 /// - `subscriber`: The subscription sink for sending signature status notifications to the client.
75 /// - `signature_str`: The transaction signature to monitor, as a base-58 encoded string.
76 /// - `config`: Optional configuration specifying commitment level and notification preferences.
77 ///
78 /// ## Returns
79 /// This method does not return a value directly. Instead, it establishes a WebSocket subscription
80 /// that will send `RpcResponse<RpcSignatureResult>` notifications to the subscriber when the
81 /// transaction status changes.
82 ///
83 /// ## Example WebSocket Request
84 /// ```json
85 /// {
86 /// "jsonrpc": "2.0",
87 /// "id": 1,
88 /// "method": "signatureSubscribe",
89 /// "params": [
90 /// "2id3YC2jK9G5Wo2phDx4gJVAew8DcY5NAojnVuao8rkxwPYPe8cSwE5GzhEgJA2y8fVjDEo6iR6ykBvDxrTQrtpb",
91 /// {
92 /// "commitment": "finalized",
93 /// "enableReceivedNotification": false
94 /// }
95 /// ]
96 /// }
97 /// ```
98 ///
99 /// ## Example WebSocket Response (Subscription Confirmation)
100 /// ```json
101 /// {
102 /// "jsonrpc": "2.0",
103 /// "result": 0,
104 /// "id": 1
105 /// }
106 /// ```
107 ///
108 /// ## Example WebSocket Notification
109 /// ```json
110 /// {
111 /// "jsonrpc": "2.0",
112 /// "method": "signatureNotification",
113 /// "params": {
114 /// "result": {
115 /// "context": {
116 /// "slot": 5207624
117 /// },
118 /// "value": {
119 /// "err": null
120 /// }
121 /// },
122 /// "subscription": 0
123 /// }
124 /// }
125 /// ```
126 ///
127 /// ## Notes
128 /// - If the transaction already exists with the desired confirmation status, the subscriber
129 /// will be notified immediately and the subscription will complete.
130 /// - The subscription automatically terminates after sending the first matching notification.
131 /// - Invalid signature formats will cause the subscription to be rejected with an error.
132 /// - Each subscription runs in its own async task for optimal performance.
133 ///
134 /// ## See Also
135 /// - `signatureUnsubscribe`: Remove an active signature subscription
136 /// - `getSignatureStatuses`: Get current status of multiple signatures
137 #[pubsub(
138 subscription = "signatureNotification",
139 subscribe,
140 name = "signatureSubscribe"
141 )]
142 fn signature_subscribe(
143 &self,
144 meta: Self::Metadata,
145 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
146 signature_str: String,
147 config: Option<RpcSignatureSubscribeConfig>,
148 );
149
150 /// Unsubscribe from signature status notifications.
151 ///
152 /// This method removes an active signature subscription, stopping further notifications
153 /// for the specified subscription ID.
154 ///
155 /// ## Parameters
156 /// - `meta`: Optional WebSocket metadata containing connection information.
157 /// - `subscription`: The subscription ID to remove, as returned by `signatureSubscribe`.
158 ///
159 /// ## Returns
160 /// A `Result<bool>` indicating whether the unsubscription was successful:
161 /// - `Ok(true)` if the subscription was successfully removed
162 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
163 ///
164 /// ## Example WebSocket Request
165 /// ```json
166 /// {
167 /// "jsonrpc": "2.0",
168 /// "id": 1,
169 /// "method": "signatureUnsubscribe",
170 /// "params": [0]
171 /// }
172 /// ```
173 ///
174 /// ## Example WebSocket Response
175 /// ```json
176 /// {
177 /// "jsonrpc": "2.0",
178 /// "result": true,
179 /// "id": 1
180 /// }
181 /// ```
182 ///
183 /// ## Notes
184 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
185 /// - Successfully unsubscribed connections will no longer receive notifications.
186 /// - This method is thread-safe and can be called concurrently.
187 ///
188 /// ## See Also
189 /// - `signatureSubscribe`: Create a signature status subscription
190 #[pubsub(
191 subscription = "signatureNotification",
192 unsubscribe,
193 name = "signatureUnsubscribe"
194 )]
195 fn signature_unsubscribe(
196 &self,
197 meta: Option<Self::Metadata>,
198 subscription: SubscriptionId,
199 ) -> Result<bool>;
200
201 /// Subscribe to account change notifications via WebSocket.
202 ///
203 /// This method allows clients to subscribe to updates for a specific account.
204 /// The subscriber will receive notifications whenever the account's data, lamports balance,
205 /// ownership, or other properties change.
206 ///
207 /// ## Parameters
208 /// - `meta`: WebSocket metadata containing RPC context and connection information.
209 /// - `subscriber`: The subscription sink for sending account update notifications to the client.
210 /// - `pubkey_str`: The account public key to monitor, as a base-58 encoded string.
211 /// - `config`: Optional configuration specifying commitment level and encoding format for account data.
212 ///
213 /// ## Returns
214 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
215 /// subscription that will send `RpcResponse<UiAccount>` notifications to the subscriber whenever
216 /// the account state changes.
217 ///
218 /// ## Example WebSocket Request
219 /// ```json
220 /// {
221 /// "jsonrpc": "2.0",
222 /// "id": 1,
223 /// "method": "accountSubscribe",
224 /// "params": [
225 /// "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12",
226 /// {
227 /// "commitment": "finalized",
228 /// "encoding": "base64"
229 /// }
230 /// ]
231 /// }
232 /// ```
233 ///
234 /// ## Example WebSocket Response (Subscription Confirmation)
235 /// ```json
236 /// {
237 /// "jsonrpc": "2.0",
238 /// "result": 23784,
239 /// "id": 1
240 /// }
241 /// ```
242 ///
243 /// ## Example WebSocket Notification
244 /// ```json
245 /// {
246 /// "jsonrpc": "2.0",
247 /// "method": "accountNotification",
248 /// "params": {
249 /// "result": {
250 /// "context": {
251 /// "slot": 5208469
252 /// },
253 /// "value": {
254 /// "data": ["base64EncodedAccountData", "base64"],
255 /// "executable": false,
256 /// "lamports": 33594,
257 /// "owner": "11111111111111111111111111111112",
258 /// "rentEpoch": 636
259 /// }
260 /// },
261 /// "subscription": 23784
262 /// }
263 /// }
264 /// ```
265 ///
266 /// ## Notes
267 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
268 /// - Account notifications are sent whenever any aspect of the account changes.
269 /// - The encoding format specified in the config determines how account data is serialized.
270 /// - Invalid public key formats will cause the subscription to be rejected with an error.
271 /// - Each subscription runs in its own async task to ensure optimal performance.
272 ///
273 /// ## See Also
274 /// - `accountUnsubscribe`: Remove an active account subscription
275 /// - `getAccountInfo`: Get current account information
276 #[pubsub(
277 subscription = "accountNotification",
278 subscribe,
279 name = "accountSubscribe"
280 )]
281 fn account_subscribe(
282 &self,
283 meta: Self::Metadata,
284 subscriber: Subscriber<RpcResponse<UiAccount>>,
285 pubkey_str: String,
286 config: Option<RpcAccountSubscribeConfig>,
287 );
288
289 /// Unsubscribe from account change notifications.
290 ///
291 /// This method removes an active account subscription, stopping further notifications
292 /// for the specified subscription ID. The monitoring task will automatically terminate
293 /// when the subscription is removed.
294 ///
295 /// ## Parameters
296 /// - `meta`: Optional WebSocket metadata containing connection information.
297 /// - `subscription`: The subscription ID to remove, as returned by `accountSubscribe`.
298 ///
299 /// ## Returns
300 /// A `Result<bool>` indicating whether the unsubscription was successful:
301 /// - `Ok(true)` if the subscription was successfully removed
302 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
303 ///
304 /// ## Example WebSocket Request
305 /// ```json
306 /// {
307 /// "jsonrpc": "2.0",
308 /// "id": 1,
309 /// "method": "accountUnsubscribe",
310 /// "params": [23784]
311 /// }
312 /// ```
313 ///
314 /// ## Example WebSocket Response
315 /// ```json
316 /// {
317 /// "jsonrpc": "2.0",
318 /// "result": true,
319 /// "id": 1
320 /// }
321 /// ```
322 ///
323 /// ## Notes
324 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
325 /// - Successfully unsubscribed connections will no longer receive account notifications.
326 /// - The monitoring task automatically detects subscription removal and terminates gracefully.
327 /// - This method is thread-safe and can be called concurrently.
328 ///
329 /// ## See Also
330 /// - `accountSubscribe`: Create an account change subscription
331 #[pubsub(
332 subscription = "accountNotification",
333 unsubscribe,
334 name = "accountUnsubscribe"
335 )]
336 fn account_unsubscribe(
337 &self,
338 meta: Option<Self::Metadata>,
339 subscription: SubscriptionId,
340 ) -> Result<bool>;
341
342 /// Subscribe to slot notifications.
343 ///
344 /// This method allows clients to subscribe to updates for a specific slot.
345 /// The subscriber will receive notifications whenever the slot changes.
346 ///
347 /// ## Parameters
348 /// - `meta`: WebSocket metadata containing RPC context and connection information.
349 /// - `subscriber`: The subscription sink for sending slot update notifications to the client.
350 ///
351 /// ## Returns
352 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
353 /// subscription that will send `SlotInfo` notifications to the subscriber whenever
354 /// the slot changes.
355 ///
356 /// ## Example WebSocket Request
357 /// ```json
358 /// {
359 /// "jsonrpc": "2.0",
360 /// "id": 1,
361 /// "method": "slotSubscribe",
362 /// "params": [
363 /// {
364 /// "commitment": "finalized"
365 /// }
366 /// ]
367 /// }
368 /// ```
369 ///
370 /// ## Example WebSocket Response (Subscription Confirmation)
371 /// ```json
372 /// {
373 /// "jsonrpc": "2.0",
374 /// "result": 5207624,
375 /// "id": 1
376 /// }
377 /// ```
378 ///
379 /// ## Example WebSocket Notification
380 /// ```json
381 /// {
382 /// "jsonrpc": "2.0",
383 /// "method": "slotNotification",
384 /// "params": {
385 /// "result": {
386 /// "slot": 5207624
387 /// },
388 /// "subscription": 5207624
389 /// }
390 /// }
391 /// ```
392 ///
393 /// ## Notes
394 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
395 /// - Slot notifications are sent whenever the slot changes.
396 /// - The subscription automatically terminates when the slot changes.
397 /// - Each subscription runs in its own async task for optimal performance.
398 ///
399 /// ## See Also
400 /// - `slotUnsubscribe`: Remove an active slot subscription
401 #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
402 fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
403
404 /// Unsubscribe from slot notifications.
405 ///
406 /// This method removes an active slot subscription, stopping further notifications
407 /// for the specified subscription ID.
408 ///
409 /// ## Parameters
410 /// - `meta`: Optional WebSocket metadata containing connection information.
411 /// - `subscription`: The subscription ID to remove, as returned by `slotSubscribe`.
412 ///
413 /// ## Returns
414 /// A `Result<bool>` indicating whether the unsubscription was successful:
415 /// - `Ok(true)` if the subscription was successfully removed
416 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
417 ///
418 /// ## Example WebSocket Request
419 /// ```json
420 /// {
421 /// "jsonrpc": "2.0",
422 /// "id": 1,
423 /// "method": "slotUnsubscribe",
424 /// "params": [0]
425 /// }
426 /// ```
427 ///
428 /// ## Example WebSocket Response
429 /// ```json
430 /// {
431 /// "jsonrpc": "2.0",
432 /// "result": true,
433 /// "id": 1
434 /// }
435 /// ```
436 ///
437 /// ## Notes
438 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
439 /// - Successfully unsubscribed connections will no longer receive notifications.
440 /// - This method is thread-safe and can be called concurrently.
441 ///
442 /// ## See Also
443 /// - `slotSubscribe`: Create a slot subscription
444 #[pubsub(
445 subscription = "slotNotification",
446 unsubscribe,
447 name = "slotUnsubscribe"
448 )]
449 fn slot_unsubscribe(
450 &self,
451 meta: Option<Self::Metadata>,
452 subscription: SubscriptionId,
453 ) -> Result<bool>;
454
455 /// Subscribe to logs notifications.
456 ///
457 /// This method allows clients to subscribe to transaction log messages
458 /// emitted during transaction execution. It supports filtering by signature,
459 /// account mentions, or all transactions.
460 ///
461 /// ## Parameters
462 /// - `meta`: WebSocket metadata containing RPC context and connection information.
463 /// - `subscriber`: The subscription sink for sending log notifications to the client.
464 /// - `mentions`: Optional filter for the subscription: can be a specific signature, account, or `"all"`.
465 /// - `commitment`: Optional commitment level for filtering logs by block finality.
466 ///
467 /// ## Returns
468 /// This method establishes a continuous WebSocket subscription that streams
469 /// `RpcLogsResponse` notifications as new transactions are processed.
470 ///
471 /// ## Example WebSocket Request
472 /// ```json
473 /// {
474 /// "jsonrpc": "2.0",
475 /// "id": 1,
476 /// "method": "logsSubscribe",
477 /// "params": [
478 /// {
479 /// "mentions": ["11111111111111111111111111111111"]
480 /// },
481 /// {
482 /// "commitment": "finalized"
483 /// }
484 /// ]
485 /// }
486 /// ```
487 ///
488 /// ## Example WebSocket Response (Subscription Confirmation)
489 /// ```json
490 /// {
491 /// "jsonrpc": "2.0",
492 /// "result": 42,
493 /// "id": 1
494 /// }
495 /// ```
496 ///
497 /// ## Example WebSocket Notification
498 /// ```json
499 /// {
500 /// "jsonrpc": "2.0",
501 /// "method": "logsNotification",
502 /// "params": {
503 /// "result": {
504 /// "signature": "3s6n...",
505 /// "err": null,
506 /// "logs": ["Program 111111... invoke [1]", "Program 111111... success"]
507 /// },
508 /// "subscription": 42
509 /// }
510 /// }
511 /// ```
512 ///
513 /// ## Notes
514 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
515 /// - Each log subscription runs independently and supports filtering.
516 /// - Log messages may be truncated depending on cluster configuration.
517 ///
518 /// ## See Also
519 /// - `logsUnsubscribe`: Remove an active logs subscription.
520 #[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")]
521 fn logs_subscribe(
522 &self,
523 meta: Self::Metadata,
524 subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
525 mentions: Option<RpcTransactionLogsFilter>,
526 commitment: Option<CommitmentLevel>,
527 );
528
529 /// Unsubscribe from logs notifications.
530 ///
531 /// This method removes an active logs subscription, stopping further notifications
532 /// for the specified subscription ID.
533 ///
534 /// ## Parameters
535 /// - `meta`: Optional WebSocket metadata containing connection information.
536 /// - `subscription`: The subscription ID to remove, as returned by `logsSubscribe`.
537 ///
538 /// ## Returns
539 /// A `Result<bool>` indicating whether the unsubscription was successful:
540 /// - `Ok(true)` if the subscription was successfully removed.
541 /// - `Err(Error)` with `InvalidParams` if the subscription ID is not recognized.
542 ///
543 /// ## Example WebSocket Request
544 /// ```json
545 /// {
546 /// "jsonrpc": "2.0",
547 /// "id": 1,
548 /// "method": "logsUnsubscribe",
549 /// "params": [42]
550 /// }
551 /// ```
552 ///
553 /// ## Example WebSocket Response
554 /// ```json
555 /// {
556 /// "jsonrpc": "2.0",
557 /// "result": true,
558 /// "id": 1
559 /// }
560 /// ```
561 ///
562 /// ## Notes
563 /// - Unsubscribing from a non-existent subscription ID returns an error.
564 /// - Successfully unsubscribed clients will no longer receive logs notifications.
565 /// - This method is thread-safe and may be called concurrently.
566 ///
567 /// ## See Also
568 /// - `logsSubscribe`: Create a logs subscription.
569 #[pubsub(
570 subscription = "logsNotification",
571 unsubscribe,
572 name = "logsUnsubscribe"
573 )]
574 fn logs_unsubscribe(
575 &self,
576 meta: Option<Self::Metadata>,
577 subscription: SubscriptionId,
578 ) -> Result<bool>;
579}
580
581/// WebSocket RPC server implementation for Surfpool.
582///
583/// This struct manages WebSocket subscriptions for both signature status updates
584/// and account change notifications in the Surfpool environment. It provides a complete
585/// WebSocket RPC interface that allows clients to subscribe to real-time updates
586/// from the Solana Virtual Machine (SVM) and handles the lifecycle of WebSocket connections.
587///
588/// ## Fields
589/// - `uid`: Atomic counter for generating unique subscription IDs across all subscription types.
590/// - `signature_subscription_map`: Thread-safe HashMap containing active signature subscriptions, mapping subscription IDs to their notification sinks.
591/// - `account_subscription_map`: Thread-safe HashMap containing active account subscriptions, mapping subscription IDs to their notification sinks.
592/// - `slot_subscription_map`: Thread-safe HashMap containing active slot subscriptions, mapping subscription IDs to their notification sinks.
593/// - `tokio_handle`: Runtime handle for spawning asynchronous subscription monitoring tasks.
594///
595/// ## Features
596/// - **Concurrent Subscriptions**: Supports multiple simultaneous subscriptions without blocking.
597/// - **Thread Safety**: All subscription management operations are thread-safe using RwLock.
598/// - **Automatic Cleanup**: Subscriptions are automatically cleaned up when completed or unsubscribed.
599/// - **Efficient Monitoring**: Each subscription runs in its own async task for optimal performance.
600/// - **Real-time Updates**: Provides immediate notifications when monitored conditions are met.
601///
602/// ## Usage
603/// This struct implements the `Rpc` trait and is typically used as part of a larger
604/// WebSocket server infrastructure to provide real-time blockchain data to clients.
605///
606/// ## Notes
607/// - Each subscription is assigned a unique numeric ID for tracking and management.
608/// - The struct maintains separate maps for different subscription types to optimize performance.
609/// - All async operations are managed through the provided Tokio runtime handle.
610///
611/// ## See Also
612/// - `Rpc`: The trait interface this struct implements
613/// - `RpcAccountSubscribeConfig`: Configuration options for account subscriptions
614pub struct SurfpoolWsRpc {
615 pub uid: atomic::AtomicUsize,
616 pub signature_subscription_map:
617 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
618 pub account_subscription_map:
619 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
620 pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
621 pub logs_subscription_map:
622 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcLogsResponse>>>>>,
623 pub tokio_handle: tokio::runtime::Handle,
624}
625
626impl Rpc for SurfpoolWsRpc {
627 type Metadata = Option<SurfpoolWebsocketMeta>;
628
629 /// Implementation of signature subscription for WebSocket clients.
630 ///
631 /// This method handles the complete lifecycle of signature subscriptions:
632 /// 1. Validates the provided signature string format
633 /// 2. Determines the subscription type (received vs commitment-based)
634 /// 3. Checks if the transaction already exists in the desired state
635 /// 4. If found and confirmed, immediately notifies the subscriber
636 /// 5. Otherwise, sets up a continuous monitoring loop
637 /// 6. Spawns an async task to handle ongoing subscription management
638 ///
639 /// # Error Handling
640 /// - Rejects subscription with `InvalidParams` for malformed signatures
641 /// - Handles RPC context retrieval failures
642 /// - Manages subscription cleanup on completion or failure
643 ///
644 /// # Concurrency
645 /// Each subscription runs in its own async task, allowing multiple
646 /// concurrent subscriptions without blocking each other.
647 fn signature_subscribe(
648 &self,
649 meta: Self::Metadata,
650 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
651 signature_str: String,
652 config: Option<RpcSignatureSubscribeConfig>,
653 ) {
654 let signature = match Signature::from_str(&signature_str) {
655 Ok(sig) => sig,
656 Err(_) => {
657 let error = Error {
658 code: ErrorCode::InvalidParams,
659 message: "Invalid signature format.".into(),
660 data: None,
661 };
662 if let Err(e) = subscriber.reject(error.clone()) {
663 log::error!("Failed to reject subscriber: {:?}", e);
664 }
665 return;
666 }
667 };
668 let config = config.unwrap_or_default();
669 let subscription_type = if config.enable_received_notification.unwrap_or(false) {
670 SignatureSubscriptionType::Received
671 } else {
672 SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
673 };
674
675 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
676 let sub_id = SubscriptionId::Number(id as u64);
677 let sink = match subscriber.assign_id(sub_id.clone()) {
678 Ok(sink) => sink,
679 Err(e) => {
680 log::error!("Failed to assign subscription ID: {:?}", e);
681 return;
682 }
683 };
684 let active = Arc::clone(&self.signature_subscription_map);
685 let meta = meta.clone();
686
687 self.tokio_handle.spawn(async move {
688 if let Ok(mut guard) = active.write() {
689 guard.insert(sub_id.clone(), sink);
690 } else {
691 log::error!("Failed to acquire write lock on signature_subscription_map");
692 return;
693 }
694
695 let SurfnetRpcContext {
696 svm_locker,
697 remote_ctx,
698 } = match meta.get_rpc_context(()) {
699 Ok(res) => res,
700 Err(e) => {
701 log::error!("Failed to get RPC context: {:?}", e);
702 if let Ok(mut guard) = active.write() {
703 if let Some(sink) = guard.remove(&sub_id) {
704 if let Err(e) = sink.notify(Err(e.into())) {
705 log::error!("Failed to notify client about RPC context error: {e}");
706 }
707 }
708 }
709 return;
710 }
711 };
712 // get the signature from the SVM to see if it's already been processed
713 let tx_result = match svm_locker
714 .get_transaction(
715 &remote_ctx.map(|(r, _)| r),
716 &signature,
717 RpcTransactionConfig::default(),
718 )
719 .await
720 {
721 Ok(res) => res,
722 Err(e) => {
723 if let Ok(mut guard) = active.write() {
724 if let Some(sink) = guard.remove(&sub_id) {
725 let _ = sink.notify(Err(e.into()));
726 }
727 }
728 return;
729 }
730 };
731
732 // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
733 // if so, notify the user and complete the subscription
734 // otherwise, subscribe to the transaction updates
735 if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
736 match (&subscription_type, tx.confirmation_status) {
737 (&SignatureSubscriptionType::Received, _)
738 | (
739 &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
740 Some(TransactionConfirmationStatus::Processed),
741 )
742 | (
743 &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
744 Some(TransactionConfirmationStatus::Confirmed),
745 )
746 | (
747 &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
748 Some(TransactionConfirmationStatus::Finalized),
749 ) => {
750 if let Ok(mut guard) = active.write() {
751 if let Some(sink) = guard.remove(&sub_id) {
752 let _ = sink.notify(Ok(RpcResponse {
753 context: RpcResponseContext::new(tx.slot),
754 value: RpcSignatureResult::ProcessedSignature(
755 ProcessedSignatureResult { err: None },
756 ),
757 }));
758 }
759 }
760 return;
761 }
762 _ => {}
763 }
764 }
765
766 // update our surfnet SVM to subscribe to the signature updates
767 let rx =
768 svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
769
770 loop {
771 if let Ok((slot, some_err)) = rx.try_recv() {
772 if let Ok(mut guard) = active.write() {
773 if let Some(sink) = guard.remove(&sub_id) {
774 match subscription_type {
775 SignatureSubscriptionType::Received => {
776 let _ = sink.notify(Ok(RpcResponse {
777 context: RpcResponseContext::new(slot),
778 value: RpcSignatureResult::ReceivedSignature(
779 ReceivedSignatureResult::ReceivedSignature,
780 ),
781 }));
782 }
783 SignatureSubscriptionType::Commitment(_) => {
784 let _ = sink.notify(Ok(RpcResponse {
785 context: RpcResponseContext::new(slot),
786 value: RpcSignatureResult::ProcessedSignature(
787 ProcessedSignatureResult { err: some_err },
788 ),
789 }));
790 }
791 }
792 }
793 }
794 return;
795 }
796 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
797 }
798 });
799 }
800
801 /// Implementation of signature unsubscription for WebSocket clients.
802 ///
803 /// This method removes an active signature subscription from the internal
804 /// tracking maps, effectively stopping further notifications for that subscription.
805 ///
806 /// # Implementation Details
807 /// - Attempts to remove the subscription from the active subscriptions map
808 /// - Returns success if the subscription existed and was removed
809 /// - Returns an error if the subscription ID was not found
810 ///
811 /// # Thread Safety
812 /// Uses write locks to ensure thread-safe removal from the subscription map.
813 fn signature_unsubscribe(
814 &self,
815 _meta: Option<Self::Metadata>,
816 subscription: SubscriptionId,
817 ) -> Result<bool> {
818 let removed = if let Ok(mut guard) = self.signature_subscription_map.write() {
819 guard.remove(&subscription)
820 } else {
821 log::error!("Failed to acquire write lock on signature_subscription_map");
822 None
823 };
824 if removed.is_some() {
825 Ok(true)
826 } else {
827 Err(Error {
828 code: ErrorCode::InvalidParams,
829 message: "Invalid subscription.".into(),
830 data: None,
831 })
832 }
833 }
834
835 /// Implementation of account subscription for WebSocket clients.
836 ///
837 /// This method handles the complete lifecycle of account subscriptions:
838 /// 1. Validates the provided public key string format
839 /// 2. Parses the subscription configuration (commitment and encoding)
840 /// 3. Generates a unique subscription ID and assigns it to the subscriber
841 /// 4. Spawns an async task to continuously monitor account changes
842 /// 5. Sends notifications whenever the account state changes
843 ///
844 /// # Monitoring Loop
845 /// The spawned task runs a continuous loop that:
846 /// - Checks if the subscription is still active (not unsubscribed)
847 /// - Polls for account updates from the SVM
848 /// - Sends notifications to the subscriber when changes occur
849 /// - Automatically terminates when the subscription is removed
850 ///
851 /// # Error Handling
852 /// - Rejects subscription with `InvalidParams` for malformed public keys
853 /// - Handles encoding configuration for account data serialization
854 /// - Manages subscription cleanup through the monitoring loop
855 ///
856 /// # Performance
857 /// Uses efficient polling with minimal CPU overhead and automatic
858 /// cleanup when subscriptions are no longer needed.
859 fn account_subscribe(
860 &self,
861 meta: Self::Metadata,
862 subscriber: Subscriber<RpcResponse<UiAccount>>,
863 pubkey_str: String,
864 config: Option<RpcAccountSubscribeConfig>,
865 ) {
866 let pubkey = match Pubkey::from_str(&pubkey_str) {
867 Ok(pk) => pk,
868 Err(_) => {
869 let error = Error {
870 code: ErrorCode::InvalidParams,
871 message: "Invalid pubkey format.".into(),
872 data: None,
873 };
874 if subscriber.reject(error.clone()).is_err() {
875 log::error!("Failed to reject subscriber for invalid pubkey format.");
876 }
877 return;
878 }
879 };
880
881 let config = config.unwrap_or(RpcAccountSubscribeConfig {
882 commitment: None,
883 encoding: None,
884 });
885
886 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
887 let sub_id = SubscriptionId::Number(id as u64);
888 let sink = match subscriber.assign_id(sub_id.clone()) {
889 Ok(sink) => sink,
890 Err(e) => {
891 log::error!("Failed to assign subscription ID: {:?}", e);
892 return;
893 }
894 };
895
896 let account_active = Arc::clone(&self.account_subscription_map);
897 let meta = meta.clone();
898 let svm_locker = match meta.get_svm_locker() {
899 Ok(locker) => locker,
900 Err(e) => {
901 log::error!("Failed to get SVM locker for account subscription: {e}");
902 if let Err(e) = sink.notify(Err(e.into())) {
903 log::error!(
904 "Failed to send error notification to client for SVM locker failure: {e}"
905 );
906 }
907 return;
908 }
909 };
910 let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
911
912 self.tokio_handle.spawn(async move {
913 if let Ok(mut guard) = account_active.write() {
914 guard.insert(sub_id.clone(), sink);
915 } else {
916 log::error!("Failed to acquire write lock on account_subscription_map");
917 return;
918 }
919
920 // subscribe to account updates
921 let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
922
923 loop {
924 // if the subscription has been removed, break the loop
925 if let Ok(guard) = account_active.read() {
926 if guard.get(&sub_id).is_none() {
927 break;
928 }
929 } else {
930 log::error!("Failed to acquire read lock on account_subscription_map");
931 break;
932 }
933
934 if let Ok(ui_account) = rx.try_recv() {
935 if let Ok(guard) = account_active.read() {
936 if let Some(sink) = guard.get(&sub_id) {
937 let _ = sink.notify(Ok(RpcResponse {
938 context: RpcResponseContext::new(slot),
939 value: ui_account,
940 }));
941 }
942 }
943 }
944 }
945 });
946 }
947
948 /// Implementation of account unsubscription for WebSocket clients.
949 ///
950 /// This method removes an active account subscription from the internal
951 /// tracking maps, effectively stopping further notifications for that subscription.
952 /// The monitoring loop in the corresponding subscription task will detect this
953 /// removal and automatically terminate.
954 ///
955 /// # Implementation Details
956 /// - Attempts to remove the subscription from the account subscriptions map
957 /// - Returns success if the subscription existed and was removed
958 /// - Returns an error if the subscription ID was not found
959 /// - The removal triggers automatic cleanup of the monitoring task
960 ///
961 /// # Thread Safety
962 /// Uses write locks to ensure thread-safe removal from the subscription map.
963 /// The monitoring task uses read locks to check subscription status, creating
964 /// a clean synchronization pattern.
965 fn account_unsubscribe(
966 &self,
967 _meta: Option<Self::Metadata>,
968 subscription: SubscriptionId,
969 ) -> Result<bool> {
970 let removed = if let Ok(mut guard) = self.account_subscription_map.write() {
971 guard.remove(&subscription)
972 } else {
973 log::error!("Failed to acquire write lock on account_subscription_map");
974 None
975 };
976 if removed.is_some() {
977 Ok(true)
978 } else {
979 Err(Error {
980 code: ErrorCode::InvalidParams,
981 message: "Invalid subscription.".into(),
982 data: None,
983 })
984 }
985 }
986
987 fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
988 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
989 let sub_id = SubscriptionId::Number(id as u64);
990 let sink = match subscriber.assign_id(sub_id.clone()) {
991 Ok(sink) => sink,
992 Err(e) => {
993 log::error!("Failed to assign subscription ID: {:?}", e);
994 return;
995 }
996 };
997
998 let slot_active = Arc::clone(&self.slot_subscription_map);
999 let meta = meta.clone();
1000
1001 let svm_locker = match meta.get_svm_locker() {
1002 Ok(locker) => locker,
1003 Err(e) => {
1004 log::error!("Failed to get SVM locker for slot subscription: {e}");
1005 if let Err(e) = sink.notify(Err(e.into())) {
1006 log::error!(
1007 "Failed to send error notification to client for SVM locker failure: {e}"
1008 );
1009 }
1010 return;
1011 }
1012 };
1013
1014 self.tokio_handle.spawn(async move {
1015 if let Ok(mut guard) = slot_active.write() {
1016 guard.insert(sub_id.clone(), sink);
1017 } else {
1018 log::error!("Failed to acquire write lock on slot_subscription_map");
1019 return;
1020 }
1021
1022 let rx = svm_locker.subscribe_for_slot_updates();
1023
1024 loop {
1025 // if the subscription has been removed, break the loop
1026 if let Ok(guard) = slot_active.read() {
1027 if guard.get(&sub_id).is_none() {
1028 break;
1029 }
1030 } else {
1031 log::error!("Failed to acquire read lock on slot_subscription_map");
1032 break;
1033 }
1034
1035 if let Ok(slot_info) = rx.try_recv() {
1036 if let Ok(guard) = slot_active.read() {
1037 if let Some(sink) = guard.get(&sub_id) {
1038 let _ = sink.notify(Ok(slot_info));
1039 }
1040 }
1041 }
1042 }
1043 });
1044 }
1045
1046 fn slot_unsubscribe(
1047 &self,
1048 _meta: Option<Self::Metadata>,
1049 subscription: SubscriptionId,
1050 ) -> Result<bool> {
1051 let removed = if let Ok(mut guard) = self.slot_subscription_map.write() {
1052 guard.remove(&subscription)
1053 } else {
1054 log::error!("Failed to acquire write lock on slot_subscription_map");
1055 None
1056 };
1057 if removed.is_some() {
1058 Ok(true)
1059 } else {
1060 Err(Error {
1061 code: ErrorCode::InvalidParams,
1062 message: "Invalid subscription.".into(),
1063 data: None,
1064 })
1065 }
1066 }
1067
1068 fn logs_subscribe(
1069 &self,
1070 meta: Self::Metadata,
1071 subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
1072 mentions: Option<RpcTransactionLogsFilter>,
1073 commitment: Option<CommitmentLevel>,
1074 ) {
1075 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1076 let sub_id = SubscriptionId::Number(id as u64);
1077 let sink = match subscriber.assign_id(sub_id.clone()) {
1078 Ok(sink) => sink,
1079 Err(e) => {
1080 log::error!("Failed to assign subscription ID: {:?}", e);
1081 return;
1082 }
1083 };
1084
1085 let mentions = mentions.unwrap_or(RpcTransactionLogsFilter::All);
1086 let commitment = commitment.unwrap_or(CommitmentLevel::Confirmed);
1087
1088 let logs_active = Arc::clone(&self.logs_subscription_map);
1089 let meta = meta.clone();
1090
1091 let svm_locker = match meta.get_svm_locker() {
1092 Ok(locker) => locker,
1093 Err(e) => {
1094 log::error!("Failed to get SVM locker for slot subscription: {e}");
1095 if let Err(e) = sink.notify(Err(e.into())) {
1096 log::error!(
1097 "Failed to send error notification to client for SVM locker failure: {e}"
1098 );
1099 }
1100 return;
1101 }
1102 };
1103
1104 self.tokio_handle.spawn(async move {
1105 if let Ok(mut guard) = logs_active.write() {
1106 guard.insert(sub_id.clone(), sink);
1107 } else {
1108 log::error!("Failed to acquire write lock on slot_subscription_map");
1109 return;
1110 }
1111
1112 let rx = svm_locker.subscribe_for_logs_updates(&commitment, &mentions);
1113
1114 loop {
1115 // if the subscription has been removed, break the loop
1116 if let Ok(guard) = logs_active.read() {
1117 if guard.get(&sub_id).is_none() {
1118 break;
1119 }
1120 } else {
1121 log::error!("Failed to acquire read lock on slot_subscription_map");
1122 break;
1123 }
1124
1125 if let Ok((slot, value)) = rx.try_recv() {
1126 if let Ok(guard) = logs_active.read() {
1127 if let Some(sink) = guard.get(&sub_id) {
1128 let _ = sink.notify(Ok(RpcResponse {
1129 context: RpcResponseContext::new(slot),
1130 value,
1131 }));
1132 }
1133 }
1134 }
1135 }
1136 });
1137 }
1138
1139 fn logs_unsubscribe(
1140 &self,
1141 _meta: Option<Self::Metadata>,
1142 subscription: SubscriptionId,
1143 ) -> Result<bool> {
1144 let removed = if let Ok(mut guard) = self.logs_subscription_map.write() {
1145 guard.remove(&subscription)
1146 } else {
1147 log::error!("Failed to acquire write lock on logs_subscription_map");
1148 None
1149 };
1150 if removed.is_some() {
1151 Ok(true)
1152 } else {
1153 Err(Error {
1154 code: ErrorCode::InvalidParams,
1155 message: "Invalid subscription.".into(),
1156 data: None,
1157 })
1158 }
1159 }
1160}