surfpool_core/rpc/ws.rs
1use std::{
2 collections::HashMap,
3 str::FromStr,
4 sync::{Arc, RwLock, atomic},
5};
6
7use crossbeam_channel::TryRecvError;
8use jsonrpc_core::{Error, ErrorCode, Result};
9use jsonrpc_derive::rpc;
10use jsonrpc_pubsub::{
11 SubscriptionId,
12 typed::{Sink, Subscriber},
13};
14use solana_account_decoder::{UiAccount, UiAccountEncoding};
15use solana_client::{
16 rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig, RpcTransactionLogsFilter},
17 rpc_filter::RpcFilterType,
18 rpc_response::{
19 ProcessedSignatureResult, ReceivedSignatureResult, RpcKeyedAccount, RpcLogsResponse,
20 RpcResponseContext, RpcSignatureResult,
21 },
22};
23use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
24use solana_pubkey::Pubkey;
25use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
26use solana_signature::Signature;
27use solana_transaction_status::{TransactionConfirmationStatus, UiTransactionEncoding};
28
29use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
30use crate::surfnet::{GetTransactionResult, SignatureSubscriptionType};
31
32/// Configuration for account subscription requests.
33///
34/// This struct defines the parameters that clients can specify when subscribing
35/// to account change notifications through WebSocket connections. It allows customization
36/// of both the commitment level for updates and the encoding format for account data.
37///
38/// ## Fields
39/// - `commitment`: Optional commitment configuration specifying when to send notifications
40/// (processed, confirmed, or finalized). Defaults to the node's default commitment level.
41/// - `encoding`: Optional encoding format for account data serialization (base58, base64, jsonParsed, etc.).
42/// Defaults to base58 encoding if not specified.
43///
44/// ## Usage
45/// Clients can provide this configuration to customize their subscription behavior:
46/// - Set commitment level to control notification timing based on confirmation status
47/// - Set encoding to specify the preferred format for receiving account data
48///
49/// ## Example Usage
50/// ```json
51/// {
52/// "commitment": "confirmed",
53/// "encoding": "base64"
54/// }
55/// ```
56#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
57#[serde(rename_all = "camelCase")]
58pub struct RpcAccountSubscribeConfig {
59 #[serde(flatten)]
60 pub commitment: Option<CommitmentConfig>,
61 pub encoding: Option<UiAccountEncoding>,
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
65#[serde(rename_all = "camelCase")]
66pub struct RpcProgramSubscribeConfig {
67 #[serde(flatten)]
68 pub commitment: Option<CommitmentConfig>,
69 pub encoding: Option<UiAccountEncoding>,
70 pub filters: Option<Vec<RpcFilterType>>,
71}
72
73#[rpc]
74pub trait Rpc {
75 type Metadata;
76
77 /// Subscribe to signature status notifications via WebSocket.
78 ///
79 /// This method allows clients to subscribe to status updates for a specific transaction signature.
80 /// The subscriber will receive notifications when the transaction reaches the desired confirmation level
81 /// or when it's initially received by the network (if configured).
82 ///
83 /// ## Parameters
84 /// - `meta`: WebSocket metadata containing RPC context and connection information.
85 /// - `subscriber`: The subscription sink for sending signature status notifications to the client.
86 /// - `signature_str`: The transaction signature to monitor, as a base-58 encoded string.
87 /// - `config`: Optional configuration specifying commitment level and notification preferences.
88 ///
89 /// ## Returns
90 /// This method does not return a value directly. Instead, it establishes a WebSocket subscription
91 /// that will send `RpcResponse<RpcSignatureResult>` notifications to the subscriber when the
92 /// transaction status changes.
93 ///
94 /// ## Example WebSocket Request
95 /// ```json
96 /// {
97 /// "jsonrpc": "2.0",
98 /// "id": 1,
99 /// "method": "signatureSubscribe",
100 /// "params": [
101 /// "2id3YC2jK9G5Wo2phDx4gJVAew8DcY5NAojnVuao8rkxwPYPe8cSwE5GzhEgJA2y8fVjDEo6iR6ykBvDxrTQrtpb",
102 /// {
103 /// "commitment": "finalized",
104 /// "enableReceivedNotification": false
105 /// }
106 /// ]
107 /// }
108 /// ```
109 ///
110 /// ## Example WebSocket Response (Subscription Confirmation)
111 /// ```json
112 /// {
113 /// "jsonrpc": "2.0",
114 /// "result": 0,
115 /// "id": 1
116 /// }
117 /// ```
118 ///
119 /// ## Example WebSocket Notification
120 /// ```json
121 /// {
122 /// "jsonrpc": "2.0",
123 /// "method": "signatureNotification",
124 /// "params": {
125 /// "result": {
126 /// "context": {
127 /// "slot": 5207624
128 /// },
129 /// "value": {
130 /// "err": null
131 /// }
132 /// },
133 /// "subscription": 0
134 /// }
135 /// }
136 /// ```
137 ///
138 /// ## Notes
139 /// - If the transaction already exists with the desired confirmation status, the subscriber
140 /// will be notified immediately and the subscription will complete.
141 /// - The subscription automatically terminates after sending the first matching notification.
142 /// - Invalid signature formats will cause the subscription to be rejected with an error.
143 /// - Each subscription runs in its own async task for optimal performance.
144 ///
145 /// ## See Also
146 /// - `signatureUnsubscribe`: Remove an active signature subscription
147 /// - `getSignatureStatuses`: Get current status of multiple signatures
148 #[pubsub(
149 subscription = "signatureNotification",
150 subscribe,
151 name = "signatureSubscribe"
152 )]
153 fn signature_subscribe(
154 &self,
155 meta: Self::Metadata,
156 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
157 signature_str: String,
158 config: Option<RpcSignatureSubscribeConfig>,
159 );
160
161 /// Unsubscribe from signature status notifications.
162 ///
163 /// This method removes an active signature subscription, stopping further notifications
164 /// for the specified subscription ID.
165 ///
166 /// ## Parameters
167 /// - `meta`: Optional WebSocket metadata containing connection information.
168 /// - `subscription`: The subscription ID to remove, as returned by `signatureSubscribe`.
169 ///
170 /// ## Returns
171 /// A `Result<bool>` indicating whether the unsubscription was successful:
172 /// - `Ok(true)` if the subscription was successfully removed
173 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
174 ///
175 /// ## Example WebSocket Request
176 /// ```json
177 /// {
178 /// "jsonrpc": "2.0",
179 /// "id": 1,
180 /// "method": "signatureUnsubscribe",
181 /// "params": [0]
182 /// }
183 /// ```
184 ///
185 /// ## Example WebSocket Response
186 /// ```json
187 /// {
188 /// "jsonrpc": "2.0",
189 /// "result": true,
190 /// "id": 1
191 /// }
192 /// ```
193 ///
194 /// ## Notes
195 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
196 /// - Successfully unsubscribed connections will no longer receive notifications.
197 /// - This method is thread-safe and can be called concurrently.
198 ///
199 /// ## See Also
200 /// - `signatureSubscribe`: Create a signature status subscription
201 #[pubsub(
202 subscription = "signatureNotification",
203 unsubscribe,
204 name = "signatureUnsubscribe"
205 )]
206 fn signature_unsubscribe(
207 &self,
208 meta: Option<Self::Metadata>,
209 subscription: SubscriptionId,
210 ) -> Result<bool>;
211
212 /// Subscribe to account change notifications via WebSocket.
213 ///
214 /// This method allows clients to subscribe to updates for a specific account.
215 /// The subscriber will receive notifications whenever the account's data, lamports balance,
216 /// ownership, or other properties change.
217 ///
218 /// ## Parameters
219 /// - `meta`: WebSocket metadata containing RPC context and connection information.
220 /// - `subscriber`: The subscription sink for sending account update notifications to the client.
221 /// - `pubkey_str`: The account public key to monitor, as a base-58 encoded string.
222 /// - `config`: Optional configuration specifying commitment level and encoding format for account data.
223 ///
224 /// ## Returns
225 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
226 /// subscription that will send `RpcResponse<UiAccount>` notifications to the subscriber whenever
227 /// the account state changes.
228 ///
229 /// ## Example WebSocket Request
230 /// ```json
231 /// {
232 /// "jsonrpc": "2.0",
233 /// "id": 1,
234 /// "method": "accountSubscribe",
235 /// "params": [
236 /// "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12",
237 /// {
238 /// "commitment": "finalized",
239 /// "encoding": "base64"
240 /// }
241 /// ]
242 /// }
243 /// ```
244 ///
245 /// ## Example WebSocket Response (Subscription Confirmation)
246 /// ```json
247 /// {
248 /// "jsonrpc": "2.0",
249 /// "result": 23784,
250 /// "id": 1
251 /// }
252 /// ```
253 ///
254 /// ## Example WebSocket Notification
255 /// ```json
256 /// {
257 /// "jsonrpc": "2.0",
258 /// "method": "accountNotification",
259 /// "params": {
260 /// "result": {
261 /// "context": {
262 /// "slot": 5208469
263 /// },
264 /// "value": {
265 /// "data": ["base64EncodedAccountData", "base64"],
266 /// "executable": false,
267 /// "lamports": 33594,
268 /// "owner": "11111111111111111111111111111112",
269 /// "rentEpoch": 636
270 /// }
271 /// },
272 /// "subscription": 23784
273 /// }
274 /// }
275 /// ```
276 ///
277 /// ## Notes
278 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
279 /// - Account notifications are sent whenever any aspect of the account changes.
280 /// - The encoding format specified in the config determines how account data is serialized.
281 /// - Invalid public key formats will cause the subscription to be rejected with an error.
282 /// - Each subscription runs in its own async task to ensure optimal performance.
283 ///
284 /// ## See Also
285 /// - `accountUnsubscribe`: Remove an active account subscription
286 /// - `getAccountInfo`: Get current account information
287 #[pubsub(
288 subscription = "accountNotification",
289 subscribe,
290 name = "accountSubscribe"
291 )]
292 fn account_subscribe(
293 &self,
294 meta: Self::Metadata,
295 subscriber: Subscriber<RpcResponse<UiAccount>>,
296 pubkey_str: String,
297 config: Option<RpcAccountSubscribeConfig>,
298 );
299
300 /// Unsubscribe from account change notifications.
301 ///
302 /// This method removes an active account subscription, stopping further notifications
303 /// for the specified subscription ID. The monitoring task will automatically terminate
304 /// when the subscription is removed.
305 ///
306 /// ## Parameters
307 /// - `meta`: Optional WebSocket metadata containing connection information.
308 /// - `subscription`: The subscription ID to remove, as returned by `accountSubscribe`.
309 ///
310 /// ## Returns
311 /// A `Result<bool>` indicating whether the unsubscription was successful:
312 /// - `Ok(true)` if the subscription was successfully removed
313 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
314 ///
315 /// ## Example WebSocket Request
316 /// ```json
317 /// {
318 /// "jsonrpc": "2.0",
319 /// "id": 1,
320 /// "method": "accountUnsubscribe",
321 /// "params": [23784]
322 /// }
323 /// ```
324 ///
325 /// ## Example WebSocket Response
326 /// ```json
327 /// {
328 /// "jsonrpc": "2.0",
329 /// "result": true,
330 /// "id": 1
331 /// }
332 /// ```
333 ///
334 /// ## Notes
335 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
336 /// - Successfully unsubscribed connections will no longer receive account notifications.
337 /// - The monitoring task automatically detects subscription removal and terminates gracefully.
338 /// - This method is thread-safe and can be called concurrently.
339 ///
340 /// ## See Also
341 /// - `accountSubscribe`: Create an account change subscription
342 #[pubsub(
343 subscription = "accountNotification",
344 unsubscribe,
345 name = "accountUnsubscribe"
346 )]
347 fn account_unsubscribe(
348 &self,
349 meta: Option<Self::Metadata>,
350 subscription: SubscriptionId,
351 ) -> Result<bool>;
352
353 /// Subscribe to slot notifications.
354 ///
355 /// This method allows clients to subscribe to updates for a specific slot.
356 /// The subscriber will receive notifications whenever the slot changes.
357 ///
358 /// ## Parameters
359 /// - `meta`: WebSocket metadata containing RPC context and connection information.
360 /// - `subscriber`: The subscription sink for sending slot update notifications to the client.
361 ///
362 /// ## Returns
363 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
364 /// subscription that will send `SlotInfo` notifications to the subscriber whenever
365 /// the slot changes.
366 ///
367 /// ## Example WebSocket Request
368 /// ```json
369 /// {
370 /// "jsonrpc": "2.0",
371 /// "id": 1,
372 /// "method": "slotSubscribe",
373 /// "params": [
374 /// {
375 /// "commitment": "finalized"
376 /// }
377 /// ]
378 /// }
379 /// ```
380 ///
381 /// ## Example WebSocket Response (Subscription Confirmation)
382 /// ```json
383 /// {
384 /// "jsonrpc": "2.0",
385 /// "result": 5207624,
386 /// "id": 1
387 /// }
388 /// ```
389 ///
390 /// ## Example WebSocket Notification
391 /// ```json
392 /// {
393 /// "jsonrpc": "2.0",
394 /// "method": "slotNotification",
395 /// "params": {
396 /// "result": {
397 /// "slot": 5207624
398 /// },
399 /// "subscription": 5207624
400 /// }
401 /// }
402 /// ```
403 ///
404 /// ## Notes
405 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
406 /// - Slot notifications are sent whenever the slot changes.
407 /// - The subscription automatically terminates when the slot changes.
408 /// - Each subscription runs in its own async task for optimal performance.
409 ///
410 /// ## See Also
411 /// - `slotUnsubscribe`: Remove an active slot subscription
412 #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
413 fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
414
415 /// Unsubscribe from slot notifications.
416 ///
417 /// This method removes an active slot subscription, stopping further notifications
418 /// for the specified subscription ID.
419 ///
420 /// ## Parameters
421 /// - `meta`: Optional WebSocket metadata containing connection information.
422 /// - `subscription`: The subscription ID to remove, as returned by `slotSubscribe`.
423 ///
424 /// ## Returns
425 /// A `Result<bool>` indicating whether the unsubscription was successful:
426 /// - `Ok(true)` if the subscription was successfully removed
427 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
428 ///
429 /// ## Example WebSocket Request
430 /// ```json
431 /// {
432 /// "jsonrpc": "2.0",
433 /// "id": 1,
434 /// "method": "slotUnsubscribe",
435 /// "params": [0]
436 /// }
437 /// ```
438 ///
439 /// ## Example WebSocket Response
440 /// ```json
441 /// {
442 /// "jsonrpc": "2.0",
443 /// "result": true,
444 /// "id": 1
445 /// }
446 /// ```
447 ///
448 /// ## Notes
449 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
450 /// - Successfully unsubscribed connections will no longer receive notifications.
451 /// - This method is thread-safe and can be called concurrently.
452 ///
453 /// ## See Also
454 /// - `slotSubscribe`: Create a slot subscription
455 #[pubsub(
456 subscription = "slotNotification",
457 unsubscribe,
458 name = "slotUnsubscribe"
459 )]
460 fn slot_unsubscribe(
461 &self,
462 meta: Option<Self::Metadata>,
463 subscription: SubscriptionId,
464 ) -> Result<bool>;
465
466 /// Subscribe to logs notifications.
467 ///
468 /// This method allows clients to subscribe to transaction log messages
469 /// emitted during transaction execution. It supports filtering by signature,
470 /// account mentions, or all transactions.
471 ///
472 /// ## Parameters
473 /// - `meta`: WebSocket metadata containing RPC context and connection information.
474 /// - `subscriber`: The subscription sink for sending log notifications to the client.
475 /// - `mentions`: Optional filter for the subscription: can be a specific signature, account, or `"all"`.
476 /// - `commitment`: Optional commitment level for filtering logs by block finality.
477 ///
478 /// ## Returns
479 /// This method establishes a continuous WebSocket subscription that streams
480 /// `RpcLogsResponse` notifications as new transactions are processed.
481 ///
482 /// ## Example WebSocket Request
483 /// ```json
484 /// {
485 /// "jsonrpc": "2.0",
486 /// "id": 1,
487 /// "method": "logsSubscribe",
488 /// "params": [
489 /// {
490 /// "mentions": ["11111111111111111111111111111111"]
491 /// },
492 /// {
493 /// "commitment": "finalized"
494 /// }
495 /// ]
496 /// }
497 /// ```
498 ///
499 /// ## Example WebSocket Response (Subscription Confirmation)
500 /// ```json
501 /// {
502 /// "jsonrpc": "2.0",
503 /// "result": 42,
504 /// "id": 1
505 /// }
506 /// ```
507 ///
508 /// ## Example WebSocket Notification
509 /// ```json
510 /// {
511 /// "jsonrpc": "2.0",
512 /// "method": "logsNotification",
513 /// "params": {
514 /// "result": {
515 /// "signature": "3s6n...",
516 /// "err": null,
517 /// "logs": ["Program 111111... invoke [1]", "Program 111111... success"]
518 /// },
519 /// "subscription": 42
520 /// }
521 /// }
522 /// ```
523 ///
524 /// ## Notes
525 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
526 /// - Each log subscription runs independently and supports filtering.
527 /// - Log messages may be truncated depending on cluster configuration.
528 ///
529 /// ## See Also
530 /// - `logsUnsubscribe`: Remove an active logs subscription.
531 #[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")]
532 fn logs_subscribe(
533 &self,
534 meta: Self::Metadata,
535 subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
536 mentions: Option<RpcTransactionLogsFilter>,
537 commitment: Option<CommitmentConfig>,
538 );
539
540 /// Unsubscribe from logs notifications.
541 ///
542 /// This method removes an active logs subscription, stopping further notifications
543 /// for the specified subscription ID.
544 ///
545 /// ## Parameters
546 /// - `meta`: Optional WebSocket metadata containing connection information.
547 /// - `subscription`: The subscription ID to remove, as returned by `logsSubscribe`.
548 ///
549 /// ## Returns
550 /// A `Result<bool>` indicating whether the unsubscription was successful:
551 /// - `Ok(true)` if the subscription was successfully removed.
552 /// - `Err(Error)` with `InvalidParams` if the subscription ID is not recognized.
553 ///
554 /// ## Example WebSocket Request
555 /// ```json
556 /// {
557 /// "jsonrpc": "2.0",
558 /// "id": 1,
559 /// "method": "logsUnsubscribe",
560 /// "params": [42]
561 /// }
562 /// ```
563 ///
564 /// ## Example WebSocket Response
565 /// ```json
566 /// {
567 /// "jsonrpc": "2.0",
568 /// "result": true,
569 /// "id": 1
570 /// }
571 /// ```
572 ///
573 /// ## Notes
574 /// - Unsubscribing from a non-existent subscription ID returns an error.
575 /// - Successfully unsubscribed clients will no longer receive logs notifications.
576 /// - This method is thread-safe and may be called concurrently.
577 ///
578 /// ## See Also
579 /// - `logsSubscribe`: Create a logs subscription.
580 #[pubsub(
581 subscription = "logsNotification",
582 unsubscribe,
583 name = "logsUnsubscribe"
584 )]
585 fn logs_unsubscribe(
586 &self,
587 meta: Option<Self::Metadata>,
588 subscription: SubscriptionId,
589 ) -> Result<bool>;
590
591 #[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
592 fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
593
594 #[pubsub(
595 subscription = "rootNotification",
596 unsubscribe,
597 name = "rootUnsubscribe"
598 )]
599 fn root_unsubscribe(
600 &self,
601 meta: Option<Self::Metadata>,
602 subscription: SubscriptionId,
603 ) -> Result<bool>;
604
605 /// Subscribe to notifications for all accounts owned by a specific program via WebSocket.
606 ///
607 /// This method allows clients to subscribe to updates for any account whose `owner`
608 /// matches the given program ID. Notifications are sent whenever an account owned by
609 /// the program is created, updated, or deleted.
610 ///
611 /// ## Parameters
612 /// - `meta`: WebSocket metadata containing RPC context and connection information.
613 /// - `subscriber`: The subscription sink for sending program account notifications to the client.
614 /// - `pubkey_str`: The program public key to monitor, as a base-58 encoded string.
615 /// - `config`: Optional configuration specifying commitment level, encoding format, and filters.
616 ///
617 /// ## Returns
618 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
619 /// subscription that will send `RpcResponse<RpcKeyedAccount>` notifications to the subscriber
620 /// whenever an account owned by the program changes.
621 ///
622 /// ## Filters
623 /// The optional config may include filters to narrow which account updates trigger notifications:
624 /// - `dataSize`: Only notify for accounts with a specific data length (in bytes).
625 /// - `memcmp`: Only notify for accounts whose data matches specific bytes at a given offset.
626 ///
627 /// ## Example WebSocket Request
628 /// ```json
629 /// {
630 /// "jsonrpc": "2.0",
631 /// "id": 1,
632 /// "method": "programSubscribe",
633 /// "params": [
634 /// "11111111111111111111111111111111",
635 /// {
636 /// "encoding": "base64",
637 /// "filters": [
638 /// { "dataSize": 80 }
639 /// ]
640 /// }
641 /// ]
642 /// }
643 /// ```
644 ///
645 /// ## Example WebSocket Response (Subscription Confirmation)
646 /// ```json
647 /// {
648 /// "jsonrpc": "2.0",
649 /// "result": 24040,
650 /// "id": 1
651 /// }
652 /// ```
653 ///
654 /// ## Example WebSocket Notification
655 /// ```json
656 /// {
657 /// "jsonrpc": "2.0",
658 /// "method": "programNotification",
659 /// "params": {
660 /// "result": {
661 /// "context": { "slot": 5208469 },
662 /// "value": {
663 /// "pubkey": "H4vnBqifaSACnKa7acsxstsY1iV1bvJNxsCY7enrd1hq",
664 /// "account": {
665 /// "data": ["base64data", "base64"],
666 /// "executable": false,
667 /// "lamports": 33594,
668 /// "owner": "11111111111111111111111111111111",
669 /// "rentEpoch": 636,
670 /// "space": 36
671 /// }
672 /// }
673 /// },
674 /// "subscription": 24040
675 /// }
676 /// }
677 /// ```
678 ///
679 /// ## Notes
680 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
681 /// - Notifications include both the account pubkey and the full account data.
682 /// - Invalid public key formats will cause the subscription to be rejected with an error.
683 /// - Each subscription runs in its own async task for optimal performance.
684 ///
685 /// ## See Also
686 /// - `programUnsubscribe`: Remove an active program subscription
687 /// - `getProgramAccounts`: Get current accounts for a program
688 #[pubsub(
689 subscription = "programNotification",
690 subscribe,
691 name = "programSubscribe"
692 )]
693 fn program_subscribe(
694 &self,
695 meta: Self::Metadata,
696 subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
697 pubkey_str: String,
698 config: Option<RpcProgramSubscribeConfig>,
699 );
700
701 /// Unsubscribe from program account change notifications.
702 ///
703 /// This method removes an active program subscription, stopping further notifications
704 /// for the specified subscription ID. The monitoring task will automatically terminate
705 /// when the subscription is removed.
706 ///
707 /// ## Parameters
708 /// - `meta`: Optional WebSocket metadata containing connection information.
709 /// - `subscription`: The subscription ID to remove, as returned by `programSubscribe`.
710 ///
711 /// ## Returns
712 /// A `Result<bool>` indicating whether the unsubscription was successful:
713 /// - `Ok(true)` if the subscription was successfully removed
714 /// - `Err(Error)` with `InternalError` if the subscription map lock could not be acquired
715 ///
716 /// ## Example WebSocket Request
717 /// ```json
718 /// {
719 /// "jsonrpc": "2.0",
720 /// "id": 1,
721 /// "method": "programUnsubscribe",
722 /// "params": [24040]
723 /// }
724 /// ```
725 ///
726 /// ## Example WebSocket Response
727 /// ```json
728 /// {
729 /// "jsonrpc": "2.0",
730 /// "result": true,
731 /// "id": 1
732 /// }
733 /// ```
734 ///
735 /// ## Notes
736 /// - Successfully unsubscribed connections will no longer receive program account notifications.
737 /// - The monitoring task automatically detects subscription removal and terminates gracefully.
738 /// - This method is thread-safe and can be called concurrently.
739 ///
740 /// ## See Also
741 /// - `programSubscribe`: Create a program account change subscription
742 #[pubsub(
743 subscription = "programNotification",
744 unsubscribe,
745 name = "programUnsubscribe"
746 )]
747 fn program_unsubscribe(
748 &self,
749 meta: Option<Self::Metadata>,
750 subscription: SubscriptionId,
751 ) -> Result<bool>;
752
753 #[pubsub(
754 subscription = "slotsUpdatesNotification",
755 subscribe,
756 name = "slotsUpdatesSubscribe"
757 )]
758 fn slots_updates_subscribe(
759 &self,
760 meta: Self::Metadata,
761 subscriber: Subscriber<RpcResponse<()>>,
762 );
763
764 #[pubsub(
765 subscription = "slotsUpdatesNotification",
766 unsubscribe,
767 name = "slotsUpdatesUnsubscribe"
768 )]
769 fn slots_updates_unsubscribe(
770 &self,
771 meta: Option<Self::Metadata>,
772 subscription: SubscriptionId,
773 ) -> Result<bool>;
774
775 #[pubsub(subscription = "blockNotification", subscribe, name = "blockSubscribe")]
776 fn block_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
777
778 #[pubsub(
779 subscription = "blockNotification",
780 unsubscribe,
781 name = "blockUnsubscribe"
782 )]
783 fn block_unsubscribe(
784 &self,
785 meta: Option<Self::Metadata>,
786 subscription: SubscriptionId,
787 ) -> Result<bool>;
788
789 #[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")]
790 fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
791
792 #[pubsub(
793 subscription = "voteNotification",
794 unsubscribe,
795 name = "voteUnsubscribe"
796 )]
797 fn vote_unsubscribe(
798 &self,
799 meta: Option<Self::Metadata>,
800 subscription: SubscriptionId,
801 ) -> Result<bool>;
802
803 /// Subscribe to snapshot import notifications via WebSocket.
804 ///
805 /// This method allows clients to subscribe to real-time updates about snapshot import operations
806 /// from a specific snapshot URL. The subscriber will receive notifications when the snapshot
807 /// is being imported, including progress updates and completion status.
808 ///
809 /// ## Parameters
810 /// - `meta`: WebSocket metadata containing RPC context and connection information.
811 /// - `subscriber`: The subscription sink for sending snapshot import notifications to the client.
812 /// - `snapshot_url`: The URL of the snapshot to import and monitor.
813 ///
814 /// ## Returns
815 /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
816 /// subscription that will send `SnapshotImportNotification` notifications to the subscriber whenever
817 /// the snapshot import operation status changes.
818 ///
819 /// ## Example WebSocket Request
820 /// ```json
821 /// {
822 /// "jsonrpc": "2.0",
823 /// "id": 1,
824 /// "method": "snapshotSubscribe",
825 /// "params": ["https://example.com/snapshot.json"]
826 /// }
827 /// ```
828 ///
829 /// ## Example WebSocket Response (Subscription Confirmation)
830 /// ```json
831 /// {
832 /// "jsonrpc": "2.0",
833 /// "result": 12345,
834 /// "id": 1
835 /// }
836 /// ```
837 ///
838 /// ## Example WebSocket Notification
839 /// ```json
840 /// {
841 /// "jsonrpc": "2.0",
842 /// "method": "snapshotNotification",
843 /// "params": {
844 /// "result": {
845 /// "snapshotId": "snapshot_20240107_123456",
846 /// "status": "InProgress",
847 /// "accountsLoaded": 1500,
848 /// "totalAccounts": 3000,
849 /// "error": null
850 /// },
851 /// "subscription": 12345
852 /// }
853 /// }
854 /// ```
855 ///
856 /// ## Notes
857 /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
858 /// - Multiple clients can subscribe to different snapshot notifications simultaneously.
859 /// - The snapshot URL must be accessible and contain a valid snapshot format.
860 /// - Each subscription runs in its own async task for optimal performance.
861 ///
862 /// ## See Also
863 /// - `snapshotUnsubscribe`: Remove an active snapshot subscription
864 #[pubsub(
865 subscription = "snapshotNotification",
866 subscribe,
867 name = "snapshotSubscribe"
868 )]
869 fn snapshot_subscribe(
870 &self,
871 meta: Self::Metadata,
872 subscriber: Subscriber<crate::surfnet::SnapshotImportNotification>,
873 snapshot_url: String,
874 );
875
876 /// Unsubscribe from snapshot import notifications.
877 ///
878 /// This method removes an active snapshot subscription, stopping further notifications
879 /// for the specified subscription ID.
880 ///
881 /// ## Parameters
882 /// - `meta`: Optional WebSocket metadata containing connection information.
883 /// - `subscription`: The subscription ID to remove, as returned by `snapshotSubscribe`.
884 ///
885 /// ## Returns
886 /// A `Result<bool>` indicating whether the unsubscription was successful:
887 /// - `Ok(true)` if the subscription was successfully removed
888 /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
889 ///
890 /// ## Example WebSocket Request
891 /// ```json
892 /// {
893 /// "jsonrpc": "2.0",
894 /// "id": 1,
895 /// "method": "snapshotUnsubscribe",
896 /// "params": [12345]
897 /// }
898 /// ```
899 ///
900 /// ## Example WebSocket Response
901 /// ```json
902 /// {
903 /// "jsonrpc": "2.0",
904 /// "result": true,
905 /// "id": 1
906 /// }
907 /// ```
908 ///
909 /// ## Notes
910 /// - Attempting to unsubscribe from a non-existent subscription will return an error.
911 /// - Successfully unsubscribed connections will no longer receive snapshot notifications.
912 /// - This method is thread-safe and can be called concurrently.
913 ///
914 /// ## See Also
915 /// - `snapshotSubscribe`: Create a snapshot import subscription
916 #[pubsub(
917 subscription = "snapshotNotification",
918 unsubscribe,
919 name = "snapshotUnsubscribe"
920 )]
921 fn snapshot_unsubscribe(
922 &self,
923 meta: Option<Self::Metadata>,
924 subscription: SubscriptionId,
925 ) -> Result<bool>;
926}
927
928/// WebSocket RPC server implementation for Surfpool.
929///
930/// This struct manages WebSocket subscriptions for both signature status updates
931/// and account change notifications in the Surfpool environment. It provides a complete
932/// WebSocket RPC interface that allows clients to subscribe to real-time updates
933/// from the Solana Virtual Machine (SVM) and handles the lifecycle of WebSocket connections.
934///
935/// ## Fields
936/// - `uid`: Atomic counter for generating unique subscription IDs across all subscription types.
937/// - `signature_subscription_map`: Thread-safe HashMap containing active signature subscriptions, mapping subscription IDs to their notification sinks.
938/// - `account_subscription_map`: Thread-safe HashMap containing active account subscriptions, mapping subscription IDs to their notification sinks.
939/// - `slot_subscription_map`: Thread-safe HashMap containing active slot subscriptions, mapping subscription IDs to their notification sinks.
940/// - `tokio_handle`: Runtime handle for spawning asynchronous subscription monitoring tasks.
941///
942/// ## Features
943/// - **Concurrent Subscriptions**: Supports multiple simultaneous subscriptions without blocking.
944/// - **Thread Safety**: All subscription management operations are thread-safe using RwLock.
945/// - **Automatic Cleanup**: Subscriptions are automatically cleaned up when completed or unsubscribed.
946/// - **Efficient Monitoring**: Each subscription runs in its own async task for optimal performance.
947/// - **Real-time Updates**: Provides immediate notifications when monitored conditions are met.
948///
949/// ## Usage
950/// This struct implements the `Rpc` trait and is typically used as part of a larger
951/// WebSocket server infrastructure to provide real-time blockchain data to clients.
952///
953/// ## Notes
954/// - Each subscription is assigned a unique numeric ID for tracking and management.
955/// - The struct maintains separate maps for different subscription types to optimize performance.
956/// - All async operations are managed through the provided Tokio runtime handle.
957///
958/// ## See Also
959/// - `Rpc`: The trait interface this struct implements
960/// - `RpcAccountSubscribeConfig`: Configuration options for account subscriptions
961pub struct SurfpoolWsRpc {
962 pub uid: atomic::AtomicUsize,
963 pub signature_subscription_map:
964 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
965 pub account_subscription_map:
966 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
967 pub program_subscription_map:
968 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcKeyedAccount>>>>>,
969 pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
970 pub logs_subscription_map:
971 Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcLogsResponse>>>>>,
972 pub snapshot_subscription_map:
973 Arc<RwLock<HashMap<SubscriptionId, Sink<crate::surfnet::SnapshotImportNotification>>>>,
974 pub tokio_handle: tokio::runtime::Handle,
975}
976
977impl Rpc for SurfpoolWsRpc {
978 type Metadata = Option<SurfpoolWebsocketMeta>;
979
980 /// Implementation of signature subscription for WebSocket clients.
981 ///
982 /// This method handles the complete lifecycle of signature subscriptions:
983 /// 1. Validates the provided signature string format
984 /// 2. Determines the subscription type (received vs commitment-based)
985 /// 3. Checks if the transaction already exists in the desired state
986 /// 4. If found and confirmed, immediately notifies the subscriber
987 /// 5. Otherwise, sets up a continuous monitoring loop
988 /// 6. Spawns an async task to handle ongoing subscription management
989 ///
990 /// # Error Handling
991 /// - Rejects subscription with `InvalidParams` for malformed signatures
992 /// - Handles RPC context retrieval failures
993 /// - Manages subscription cleanup on completion or failure
994 ///
995 /// # Concurrency
996 /// Each subscription runs in its own async task, allowing multiple
997 /// concurrent subscriptions without blocking each other.
998 fn signature_subscribe(
999 &self,
1000 meta: Self::Metadata,
1001 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
1002 signature_str: String,
1003 config: Option<RpcSignatureSubscribeConfig>,
1004 ) {
1005 let _ = meta
1006 .as_ref()
1007 .map(|m| m.log_debug("Websocket 'signature_subscribe' connection established"));
1008
1009 let signature = match Signature::from_str(&signature_str) {
1010 Ok(sig) => sig,
1011 Err(_) => {
1012 let error = Error {
1013 code: ErrorCode::InvalidParams,
1014 message: "Invalid signature format.".into(),
1015 data: None,
1016 };
1017 if let Err(e) = subscriber.reject(error.clone()) {
1018 log::error!("Failed to reject subscriber: {:?}", e);
1019 }
1020 return;
1021 }
1022 };
1023 let config = config.unwrap_or_default();
1024 let rpc_transaction_config = RpcTransactionConfig {
1025 encoding: Some(UiTransactionEncoding::Json),
1026 commitment: config.commitment,
1027 max_supported_transaction_version: Some(0),
1028 };
1029
1030 let subscription_type = if config.enable_received_notification.unwrap_or(false) {
1031 SignatureSubscriptionType::Received
1032 } else {
1033 SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
1034 };
1035
1036 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1037 let sub_id = SubscriptionId::Number(id as u64);
1038 let sink = match subscriber.assign_id(sub_id.clone()) {
1039 Ok(sink) => sink,
1040 Err(e) => {
1041 log::error!("Failed to assign subscription ID: {:?}", e);
1042 return;
1043 }
1044 };
1045 let active = Arc::clone(&self.signature_subscription_map);
1046 let meta = meta.clone();
1047 self.tokio_handle.spawn(async move {
1048 if let Ok(mut guard) = active.write() {
1049 guard.insert(sub_id.clone(), sink);
1050 } else {
1051 log::error!("Failed to acquire write lock on signature_subscription_map");
1052 return;
1053 }
1054
1055 let SurfnetRpcContext {
1056 svm_locker,
1057 remote_ctx,
1058 } = match meta.get_rpc_context(()) {
1059 Ok(res) => res,
1060 Err(e) => {
1061 log::error!("Failed to get RPC context: {:?}", e);
1062 if let Ok(mut guard) = active.write() {
1063 if let Some(sink) = guard.remove(&sub_id) {
1064 if let Err(e) = sink.notify(Err(e.into())) {
1065 log::error!("Failed to notify client about RPC context error: {e}");
1066 }
1067 }
1068 }
1069 return;
1070 }
1071 };
1072 // get the signature from the SVM to see if it's already been processed
1073 let tx_result = match svm_locker
1074 .get_transaction(
1075 &remote_ctx.map(|(r, _)| r),
1076 &signature,
1077 rpc_transaction_config,
1078 )
1079 .await
1080 {
1081 Ok(res) => res,
1082 Err(e) => {
1083 if let Ok(mut guard) = active.write() {
1084 if let Some(sink) = guard.remove(&sub_id) {
1085 let _ = sink.notify(Err(e.into()));
1086 }
1087 }
1088 return;
1089 }
1090 };
1091
1092 // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
1093 // if so, notify the user and complete the subscription
1094 // otherwise, subscribe to the transaction updates
1095 if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
1096 match (&subscription_type, tx.confirmation_status) {
1097 (&SignatureSubscriptionType::Received, _)
1098 | (
1099 &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
1100 Some(TransactionConfirmationStatus::Processed),
1101 )
1102 | (
1103 &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
1104 Some(TransactionConfirmationStatus::Confirmed),
1105 )
1106 | (
1107 &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
1108 Some(TransactionConfirmationStatus::Finalized),
1109 )
1110 | (
1111 &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
1112 Some(TransactionConfirmationStatus::Confirmed),
1113 )
1114 | (
1115 &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
1116 Some(TransactionConfirmationStatus::Finalized),
1117 )
1118 | (
1119 &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
1120 Some(TransactionConfirmationStatus::Finalized),
1121 ) => {
1122 if let Ok(mut guard) = active.write() {
1123 if let Some(sink) = guard.remove(&sub_id) {
1124 let _ = sink.notify(Ok(RpcResponse {
1125 context: RpcResponseContext::new(tx.slot),
1126 value: RpcSignatureResult::ProcessedSignature(
1127 ProcessedSignatureResult {
1128 err: tx.err.map(|e| e.into()),
1129 },
1130 ),
1131 }));
1132 }
1133 }
1134 return;
1135 }
1136 _ => {}
1137 }
1138 }
1139
1140 // update our surfnet SVM to subscribe to the signature updates
1141 let rx =
1142 svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
1143
1144 loop {
1145 let (slot, some_err) = match rx.try_recv() {
1146 Ok(msg) => msg,
1147 Err(e) => {
1148 match e {
1149 TryRecvError::Empty => {
1150 // no update yet, continue
1151 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1152 continue;
1153 }
1154 TryRecvError::Disconnected => {
1155 warn!(
1156 "Signature subscription channel closed for sub id {:?}",
1157 sub_id
1158 );
1159 // channel closed, exit loop
1160 break;
1161 }
1162 }
1163 }
1164 };
1165
1166 let Ok(mut guard) = active.write() else {
1167 log::error!("Failed to acquire read lock on signature_subscription_map");
1168 break;
1169 };
1170
1171 let Some(sink) = guard.remove(&sub_id) else {
1172 log::error!("Failed to get sink for subscription ID");
1173 break;
1174 };
1175
1176 let res = match subscription_type {
1177 SignatureSubscriptionType::Received => sink.notify(Ok(RpcResponse {
1178 context: RpcResponseContext::new(slot),
1179 value: RpcSignatureResult::ReceivedSignature(
1180 ReceivedSignatureResult::ReceivedSignature,
1181 ),
1182 })),
1183 SignatureSubscriptionType::Commitment(_) => sink.notify(Ok(RpcResponse {
1184 context: RpcResponseContext::new(slot),
1185 value: RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
1186 err: some_err.map(|e| e.into()),
1187 }),
1188 })),
1189 };
1190
1191 if guard.is_empty() {
1192 break;
1193 }
1194
1195 if let Err(e) = res {
1196 log::error!("Failed to notify client about account update: {e}");
1197 break;
1198 }
1199 }
1200 });
1201 }
1202
1203 /// Implementation of signature unsubscription for WebSocket clients.
1204 ///
1205 /// This method removes an active signature subscription from the internal
1206 /// tracking maps, effectively stopping further notifications for that subscription.
1207 ///
1208 /// # Implementation Details
1209 /// - Attempts to remove the subscription from the active subscriptions map
1210 /// - Returns success if the subscription existed and was removed
1211 /// - Returns an error if the subscription ID was not found
1212 ///
1213 /// # Thread Safety
1214 /// Uses write locks to ensure thread-safe removal from the subscription map.
1215 fn signature_unsubscribe(
1216 &self,
1217 _meta: Option<Self::Metadata>,
1218 subscription: SubscriptionId,
1219 ) -> Result<bool> {
1220 if let Ok(mut guard) = self.signature_subscription_map.write() {
1221 guard.remove(&subscription);
1222 } else {
1223 log::error!("Failed to acquire write lock on signature_subscription_map");
1224 return Err(Error {
1225 code: ErrorCode::InternalError,
1226 message: "Internal error.".into(),
1227 data: None,
1228 });
1229 };
1230 Ok(true)
1231 }
1232
1233 /// Implementation of account subscription for WebSocket clients.
1234 ///
1235 /// This method handles the complete lifecycle of account subscriptions:
1236 /// 1. Validates the provided public key string format
1237 /// 2. Parses the subscription configuration (commitment and encoding)
1238 /// 3. Generates a unique subscription ID and assigns it to the subscriber
1239 /// 4. Spawns an async task to continuously monitor account changes
1240 /// 5. Sends notifications whenever the account state changes
1241 ///
1242 /// # Monitoring Loop
1243 /// The spawned task runs a continuous loop that:
1244 /// - Checks if the subscription is still active (not unsubscribed)
1245 /// - Polls for account updates from the SVM
1246 /// - Sends notifications to the subscriber when changes occur
1247 /// - Automatically terminates when the subscription is removed
1248 ///
1249 /// # Error Handling
1250 /// - Rejects subscription with `InvalidParams` for malformed public keys
1251 /// - Handles encoding configuration for account data serialization
1252 /// - Manages subscription cleanup through the monitoring loop
1253 ///
1254 /// # Performance
1255 /// Uses efficient polling with minimal CPU overhead and automatic
1256 /// cleanup when subscriptions are no longer needed.
1257 fn account_subscribe(
1258 &self,
1259 meta: Self::Metadata,
1260 subscriber: Subscriber<RpcResponse<UiAccount>>,
1261 pubkey_str: String,
1262 config: Option<RpcAccountSubscribeConfig>,
1263 ) {
1264 let _ = meta
1265 .as_ref()
1266 .map(|m| m.log_debug("Websocket 'account_subscribe' connection established"));
1267
1268 let pubkey = match Pubkey::from_str(&pubkey_str) {
1269 Ok(pk) => pk,
1270 Err(_) => {
1271 let error = Error {
1272 code: ErrorCode::InvalidParams,
1273 message: "Invalid pubkey format.".into(),
1274 data: None,
1275 };
1276 if subscriber.reject(error.clone()).is_err() {
1277 log::error!("Failed to reject subscriber for invalid pubkey format.");
1278 }
1279 return;
1280 }
1281 };
1282
1283 let config = config.unwrap_or(RpcAccountSubscribeConfig {
1284 commitment: None,
1285 encoding: None,
1286 });
1287
1288 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1289 let sub_id = SubscriptionId::Number(id as u64);
1290 let sink = match subscriber.assign_id(sub_id.clone()) {
1291 Ok(sink) => sink,
1292 Err(e) => {
1293 log::error!("Failed to assign subscription ID: {:?}", e);
1294 return;
1295 }
1296 };
1297
1298 let account_active = Arc::clone(&self.account_subscription_map);
1299 let meta = meta.clone();
1300 let svm_locker = match meta.get_svm_locker() {
1301 Ok(locker) => locker,
1302 Err(e) => {
1303 log::error!("Failed to get SVM locker for account subscription: {e}");
1304 if let Err(e) = sink.notify(Err(e.into())) {
1305 log::error!(
1306 "Failed to send error notification to client for SVM locker failure: {e}"
1307 );
1308 }
1309 return;
1310 }
1311 };
1312 let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
1313
1314 self.tokio_handle.spawn(async move {
1315 if let Ok(mut guard) = account_active.write() {
1316 guard.insert(sub_id.clone(), sink);
1317 } else {
1318 log::error!("Failed to acquire write lock on account_subscription_map");
1319 return;
1320 }
1321
1322 // subscribe to account updates
1323 let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
1324
1325 loop {
1326 // if the subscription has been removed, break the loop
1327 if let Ok(guard) = account_active.read() {
1328 if guard.get(&sub_id).is_none() {
1329 break;
1330 }
1331 } else {
1332 log::error!("Failed to acquire read lock on account_subscription_map");
1333 break;
1334 }
1335
1336 let Ok(ui_account) = rx.try_recv() else {
1337 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1338 continue;
1339 };
1340
1341 let Ok(guard) = account_active.read() else {
1342 log::error!("Failed to acquire read lock on account_subscription_map");
1343 break;
1344 };
1345
1346 let Some(sink) = guard.get(&sub_id) else {
1347 log::error!("Failed to get sink for subscription ID");
1348 break;
1349 };
1350
1351 if let Err(e) = sink.notify(Ok(RpcResponse {
1352 context: RpcResponseContext::new(slot),
1353 value: ui_account,
1354 })) {
1355 log::error!("Failed to notify client about account update: {e}");
1356 break;
1357 }
1358 }
1359 });
1360 }
1361
1362 /// Implementation of account unsubscription for WebSocket clients.
1363 ///
1364 /// This method removes an active account subscription from the internal
1365 /// tracking maps, effectively stopping further notifications for that subscription.
1366 /// The monitoring loop in the corresponding subscription task will detect this
1367 /// removal and automatically terminate.
1368 ///
1369 /// # Implementation Details
1370 /// - Attempts to remove the subscription from the account subscriptions map
1371 /// - Returns success if the subscription existed and was removed
1372 /// - Returns an error if the subscription ID was not found
1373 /// - The removal triggers automatic cleanup of the monitoring task
1374 ///
1375 /// # Thread Safety
1376 /// Uses write locks to ensure thread-safe removal from the subscription map.
1377 /// The monitoring task uses read locks to check subscription status, creating
1378 /// a clean synchronization pattern.
1379 fn account_unsubscribe(
1380 &self,
1381 _meta: Option<Self::Metadata>,
1382 subscription: SubscriptionId,
1383 ) -> Result<bool> {
1384 if let Ok(mut guard) = self.account_subscription_map.write() {
1385 guard.remove(&subscription)
1386 } else {
1387 log::error!("Failed to acquire write lock on account_subscription_map");
1388 return Err(Error {
1389 code: ErrorCode::InternalError,
1390 message: "Internal error.".into(),
1391 data: None,
1392 });
1393 };
1394 Ok(true)
1395 }
1396
1397 fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
1398 let _ = meta
1399 .as_ref()
1400 .map(|m| m.log_debug("Websocket 'slot_subscribe' connection established"));
1401
1402 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1403 let sub_id = SubscriptionId::Number(id as u64);
1404 let sink = match subscriber.assign_id(sub_id.clone()) {
1405 Ok(sink) => sink,
1406 Err(e) => {
1407 log::error!("Failed to assign subscription ID: {:?}", e);
1408 return;
1409 }
1410 };
1411
1412 let slot_active = Arc::clone(&self.slot_subscription_map);
1413 let meta = meta.clone();
1414
1415 let svm_locker = match meta.get_svm_locker() {
1416 Ok(locker) => locker,
1417 Err(e) => {
1418 log::error!("Failed to get SVM locker for slot subscription: {e}");
1419 if let Err(e) = sink.notify(Err(e.into())) {
1420 log::error!(
1421 "Failed to send error notification to client for SVM locker failure: {e}"
1422 );
1423 }
1424 return;
1425 }
1426 };
1427
1428 self.tokio_handle.spawn(async move {
1429 if let Ok(mut guard) = slot_active.write() {
1430 guard.insert(sub_id.clone(), sink);
1431 } else {
1432 log::error!("Failed to acquire write lock on slot_subscription_map");
1433 return;
1434 }
1435
1436 let rx = svm_locker.subscribe_for_slot_updates();
1437
1438 loop {
1439 // if the subscription has been removed, break the loop
1440 if let Ok(guard) = slot_active.read() {
1441 if guard.get(&sub_id).is_none() {
1442 break;
1443 }
1444 } else {
1445 log::error!("Failed to acquire read lock on slot_subscription_map");
1446 break;
1447 }
1448
1449 let Ok(slot_info) = rx.try_recv() else {
1450 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1451 continue;
1452 };
1453
1454 let Ok(guard) = slot_active.read() else {
1455 log::error!("Failed to acquire read lock on slots_subscription_map");
1456 break;
1457 };
1458
1459 let Some(sink) = guard.get(&sub_id) else {
1460 log::error!("Failed to get sink for subscription ID");
1461 break;
1462 };
1463
1464 if let Err(e) = sink.notify(Ok(slot_info)) {
1465 log::error!("Failed to notify client about slots update: {e}");
1466 break;
1467 }
1468 }
1469 });
1470 }
1471
1472 fn slot_unsubscribe(
1473 &self,
1474 _meta: Option<Self::Metadata>,
1475 subscription: SubscriptionId,
1476 ) -> Result<bool> {
1477 if let Ok(mut guard) = self.slot_subscription_map.write() {
1478 guard.remove(&subscription)
1479 } else {
1480 log::error!("Failed to acquire write lock on slot_subscription_map");
1481 return Err(Error {
1482 code: ErrorCode::InternalError,
1483 message: "Internal error.".into(),
1484 data: None,
1485 });
1486 };
1487 Ok(true)
1488 }
1489
1490 fn logs_subscribe(
1491 &self,
1492 meta: Self::Metadata,
1493 subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
1494 mentions: Option<RpcTransactionLogsFilter>,
1495 commitment: Option<CommitmentConfig>,
1496 ) {
1497 let _ = meta
1498 .as_ref()
1499 .map(|m| m.log_debug("Websocket 'logs_subscribe' connection established"));
1500
1501 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1502 let sub_id = SubscriptionId::Number(id as u64);
1503 let sink = match subscriber.assign_id(sub_id.clone()) {
1504 Ok(sink) => sink,
1505 Err(e) => {
1506 log::error!("Failed to assign subscription ID: {:?}", e);
1507 return;
1508 }
1509 };
1510
1511 let mentions = mentions.unwrap_or(RpcTransactionLogsFilter::All);
1512 let commitment = commitment.unwrap_or_default().commitment;
1513
1514 let logs_active = Arc::clone(&self.logs_subscription_map);
1515 let meta = meta.clone();
1516
1517 let svm_locker = match meta.get_svm_locker() {
1518 Ok(locker) => locker,
1519 Err(e) => {
1520 log::error!("Failed to get SVM locker for slot subscription: {e}");
1521 if let Err(e) = sink.notify(Err(e.into())) {
1522 log::error!(
1523 "Failed to send error notification to client for SVM locker failure: {e}"
1524 );
1525 }
1526 return;
1527 }
1528 };
1529
1530 self.tokio_handle.spawn(async move {
1531 if let Ok(mut guard) = logs_active.write() {
1532 guard.insert(sub_id.clone(), sink);
1533 } else {
1534 log::error!("Failed to acquire write lock on slot_subscription_map");
1535 return;
1536 }
1537
1538 let rx = svm_locker.subscribe_for_logs_updates(&commitment, &mentions);
1539
1540 loop {
1541 // if the subscription has been removed, break the loop
1542 if let Ok(guard) = logs_active.read() {
1543 if guard.get(&sub_id).is_none() {
1544 break;
1545 }
1546 } else {
1547 log::error!("Failed to acquire read lock on slot_subscription_map");
1548 break;
1549 }
1550
1551 let Ok((slot, value)) = rx.try_recv() else {
1552 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1553 continue;
1554 };
1555
1556 let Ok(guard) = logs_active.read() else {
1557 log::error!("Failed to acquire read lock on logs_subscription_map");
1558 break;
1559 };
1560
1561 let Some(sink) = guard.get(&sub_id) else {
1562 log::error!("Failed to get sink for subscription ID");
1563 break;
1564 };
1565
1566 if let Err(e) = sink.notify(Ok(RpcResponse {
1567 context: RpcResponseContext::new(slot),
1568 value,
1569 })) {
1570 log::error!("Failed to notify client about logs update: {e}");
1571 break;
1572 }
1573 }
1574 });
1575 }
1576
1577 fn logs_unsubscribe(
1578 &self,
1579 _meta: Option<Self::Metadata>,
1580 subscription: SubscriptionId,
1581 ) -> Result<bool> {
1582 if let Ok(mut guard) = self.logs_subscription_map.write() {
1583 guard.remove(&subscription);
1584 } else {
1585 log::error!("Failed to acquire write lock on logs_subscription_map");
1586 return Err(Error {
1587 code: ErrorCode::InternalError,
1588 message: "Internal error.".into(),
1589 data: None,
1590 });
1591 };
1592 Ok(true)
1593 }
1594
1595 fn root_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
1596 let _ = meta
1597 .as_ref()
1598 .map(|m| m.log_warn("Websocket method 'root_subscribe' is uninmplemented"));
1599 }
1600
1601 fn root_unsubscribe(
1602 &self,
1603 _meta: Option<Self::Metadata>,
1604 _subscription: SubscriptionId,
1605 ) -> Result<bool> {
1606 Ok(true)
1607 }
1608
1609 /// Implementation of program subscription for WebSocket clients.
1610 ///
1611 /// This method handles the complete lifecycle of program subscriptions:
1612 /// 1. Validates the provided program public key string format
1613 /// 2. Parses the subscription configuration (commitment, encoding, and filters)
1614 /// 3. Generates a unique subscription ID and assigns it to the subscriber
1615 /// 4. Spawns an async task to continuously monitor account changes for the program
1616 /// 5. Sends notifications whenever an account owned by the program changes and matches filters
1617 ///
1618 /// # Monitoring Loop
1619 /// The spawned task runs a continuous loop that:
1620 /// - Checks if the subscription is still active (not unsubscribed)
1621 /// - Polls for program account updates from the SVM
1622 /// - Applies configured filters (dataSize, memcmp) before notifying
1623 /// - Sends `RpcKeyedAccount` notifications (including account pubkey) to the subscriber
1624 /// - Automatically terminates when the subscription is removed
1625 ///
1626 /// # Error Handling
1627 /// - Rejects subscription with `InvalidParams` for malformed public keys
1628 /// - Handles encoding configuration for account data serialization
1629 /// - Manages subscription cleanup through the monitoring loop
1630 ///
1631 /// # Performance
1632 /// Uses efficient polling with minimal CPU overhead and automatic
1633 /// cleanup when subscriptions are no longer needed.
1634 fn program_subscribe(
1635 &self,
1636 meta: Self::Metadata,
1637 subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
1638 pubkey_str: String,
1639 config: Option<RpcProgramSubscribeConfig>,
1640 ) {
1641 let _ = meta
1642 .as_ref()
1643 .map(|m| m.log_debug("Websocket 'program_subscribe' connection established"));
1644
1645 let program_id = match Pubkey::from_str(&pubkey_str) {
1646 Ok(pk) => pk,
1647 Err(_) => {
1648 let error = Error {
1649 code: ErrorCode::InvalidParams,
1650 message: "Invalid pubkey format.".into(),
1651 data: None,
1652 };
1653 if subscriber.reject(error.clone()).is_err() {
1654 log::error!("Failed to reject subscriber for invalid pubkey format.");
1655 }
1656 return;
1657 }
1658 };
1659
1660 let config = config.unwrap_or_default();
1661
1662 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1663 let sub_id = SubscriptionId::Number(id as u64);
1664 let sink = match subscriber.assign_id(sub_id.clone()) {
1665 Ok(sink) => sink,
1666 Err(e) => {
1667 log::error!("Failed to assign subscription ID: {:?}", e);
1668 return;
1669 }
1670 };
1671
1672 let program_active = Arc::clone(&self.program_subscription_map);
1673 let meta = meta.clone();
1674 let svm_locker = match meta.get_svm_locker() {
1675 Ok(locker) => locker,
1676 Err(e) => {
1677 log::error!("Failed to get SVM locker for program subscription: {e}");
1678 if let Err(e) = sink.notify(Err(e.into())) {
1679 log::error!(
1680 "Failed to send error notification to client for SVM locker failure: {e}"
1681 );
1682 }
1683 return;
1684 }
1685 };
1686 let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
1687
1688 self.tokio_handle.spawn(async move {
1689 if let Ok(mut guard) = program_active.write() {
1690 guard.insert(sub_id.clone(), sink);
1691 } else {
1692 log::error!("Failed to acquire write lock on program_subscription_map");
1693 return;
1694 }
1695
1696 let rx = svm_locker.subscribe_for_program_updates(
1697 &program_id,
1698 config.encoding,
1699 config.filters,
1700 );
1701
1702 loop {
1703 // if the subscription has been removed, break the loop
1704 if let Ok(guard) = program_active.read() {
1705 if guard.get(&sub_id).is_none() {
1706 break;
1707 }
1708 } else {
1709 log::error!("Failed to acquire read lock on program_subscription_map");
1710 break;
1711 }
1712
1713 let Ok(keyed_account) = rx.try_recv() else {
1714 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1715 continue;
1716 };
1717
1718 let Ok(guard) = program_active.read() else {
1719 log::error!("Failed to acquire read lock on program_subscription_map");
1720 break;
1721 };
1722
1723 let Some(sink) = guard.get(&sub_id) else {
1724 log::error!("Failed to get sink for subscription ID");
1725 break;
1726 };
1727
1728 if let Err(e) = sink.notify(Ok(RpcResponse {
1729 context: RpcResponseContext::new(slot),
1730 value: keyed_account,
1731 })) {
1732 log::error!("Failed to notify client about program account update: {e}");
1733 break;
1734 }
1735 }
1736 });
1737 }
1738
1739 /// Implementation of program unsubscription for WebSocket clients.
1740 ///
1741 /// This method removes an active program subscription from the internal
1742 /// tracking maps, effectively stopping further notifications for that subscription.
1743 /// The monitoring loop in the corresponding subscription task will detect this
1744 /// removal and automatically terminate.
1745 ///
1746 /// # Implementation Details
1747 /// - Attempts to remove the subscription from the program subscriptions map
1748 /// - Returns success if the subscription existed and was removed
1749 /// - Returns an error if the lock could not be acquired
1750 /// - The removal triggers automatic cleanup of the monitoring task
1751 ///
1752 /// # Thread Safety
1753 /// Uses write locks to ensure thread-safe removal from the subscription map.
1754 /// The monitoring task uses read locks to check subscription status, creating
1755 /// a clean synchronization pattern.
1756 fn program_unsubscribe(
1757 &self,
1758 _meta: Option<Self::Metadata>,
1759 subscription: SubscriptionId,
1760 ) -> Result<bool> {
1761 if let Ok(mut guard) = self.program_subscription_map.write() {
1762 guard.remove(&subscription);
1763 } else {
1764 log::error!("Failed to acquire write lock on program_subscription_map");
1765 return Err(Error {
1766 code: ErrorCode::InternalError,
1767 message: "Internal error.".into(),
1768 data: None,
1769 });
1770 };
1771 Ok(true)
1772 }
1773
1774 fn slots_updates_subscribe(
1775 &self,
1776 meta: Self::Metadata,
1777 _subscriber: Subscriber<RpcResponse<()>>,
1778 ) {
1779 let _ = meta
1780 .as_ref()
1781 .map(|m| m.log_warn("Websocket method 'slots_updates_subscribe' is uninmplemented"));
1782 }
1783
1784 fn slots_updates_unsubscribe(
1785 &self,
1786 _meta: Option<Self::Metadata>,
1787 _subscription: SubscriptionId,
1788 ) -> Result<bool> {
1789 Ok(true)
1790 }
1791
1792 fn block_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
1793 let _ = meta
1794 .as_ref()
1795 .map(|m| m.log_warn("Websocket method 'block_subscribe' is uninmplemented"));
1796 }
1797
1798 fn block_unsubscribe(
1799 &self,
1800 _meta: Option<Self::Metadata>,
1801 _subscription: SubscriptionId,
1802 ) -> Result<bool> {
1803 Ok(true)
1804 }
1805
1806 fn vote_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
1807 let _ = meta
1808 .as_ref()
1809 .map(|m| m.log_warn("Websocket method 'vote_subscribe' is uninmplemented"));
1810 }
1811
1812 fn vote_unsubscribe(
1813 &self,
1814 _meta: Option<Self::Metadata>,
1815 _subscription: SubscriptionId,
1816 ) -> Result<bool> {
1817 Ok(true)
1818 }
1819
1820 fn snapshot_subscribe(
1821 &self,
1822 meta: Self::Metadata,
1823 subscriber: Subscriber<crate::surfnet::SnapshotImportNotification>,
1824 snapshot_url: String,
1825 ) {
1826 let _ = meta
1827 .as_ref()
1828 .map(|m| m.log_debug("Websocket 'snapshot_subscribe' connection established"));
1829
1830 // Validate snapshot URL format
1831 if snapshot_url.is_empty() {
1832 let error = Error {
1833 code: ErrorCode::InvalidParams,
1834 message: "Invalid snapshot URL: URL cannot be empty.".into(),
1835 data: None,
1836 };
1837 if let Err(e) = subscriber.reject(error.clone()) {
1838 log::error!("Failed to reject subscriber: {:?}", e);
1839 }
1840 return;
1841 }
1842
1843 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1844 let sub_id = SubscriptionId::Number(id as u64);
1845 let sink = match subscriber.assign_id(sub_id.clone()) {
1846 Ok(sink) => sink,
1847 Err(e) => {
1848 log::error!("Failed to assign subscription ID: {:?}", e);
1849 return;
1850 }
1851 };
1852
1853 let snapshot_active = Arc::clone(&self.snapshot_subscription_map);
1854 let meta = meta.clone();
1855
1856 let svm_locker = match meta.get_svm_locker() {
1857 Ok(locker) => locker,
1858 Err(e) => {
1859 log::error!("Failed to get SVM locker for snapshot subscription: {e}");
1860 if let Err(e) = sink.notify(Err(e.into())) {
1861 log::error!(
1862 "Failed to send error notification to client for SVM locker failure: {e}"
1863 );
1864 }
1865 return;
1866 }
1867 };
1868
1869 self.tokio_handle.spawn(async move {
1870 if let Ok(mut guard) = snapshot_active.write() {
1871 guard.insert(sub_id.clone(), sink);
1872 } else {
1873 log::error!("Failed to acquire write lock on snapshot_subscription_map");
1874 return;
1875 }
1876
1877 // Generate a unique snapshot ID for this import operation
1878 let snapshot_id = format!(
1879 "snapshot_{}",
1880 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
1881 );
1882
1883 // Subscribe to snapshot import updates
1884 // The locker will send the Started notification through the channel
1885 let rx = svm_locker.subscribe_for_snapshot_import_updates(&snapshot_url, &snapshot_id);
1886
1887 loop {
1888 // if the subscription has been removed, break the loop
1889 if let Ok(guard) = snapshot_active.read() {
1890 if guard.get(&sub_id).is_none() {
1891 break;
1892 }
1893 } else {
1894 log::error!("Failed to acquire read lock on snapshot_subscription_map");
1895 break;
1896 }
1897
1898 let Ok(notification) = rx.try_recv() else {
1899 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1900 continue;
1901 };
1902
1903 let Ok(guard) = snapshot_active.read() else {
1904 log::error!("Failed to acquire read lock on snapshot_subscription_map");
1905 break;
1906 };
1907
1908 let Some(sink) = guard.get(&sub_id) else {
1909 log::error!("Failed to get sink for subscription ID");
1910 break;
1911 };
1912
1913 if let Err(e) = sink.notify(Ok(notification)) {
1914 log::error!("Failed to notify client about snapshot import update: {e}");
1915 break;
1916 }
1917
1918 // If the import is completed or failed, we can end the subscription
1919 if let Ok(guard) = snapshot_active.read() {
1920 if let Some(_sink) = guard.get(&sub_id) {
1921 // Check if this was the final notification
1922 // We'll determine this by checking the status in the last notification
1923 // For now, we'll keep the subscription alive in case of multiple imports
1924 }
1925 }
1926 }
1927 });
1928 }
1929
1930 fn snapshot_unsubscribe(
1931 &self,
1932 _meta: Option<Self::Metadata>,
1933 subscription: SubscriptionId,
1934 ) -> Result<bool> {
1935 if let Ok(mut guard) = self.snapshot_subscription_map.write() {
1936 guard.remove(&subscription);
1937 } else {
1938 log::error!("Failed to acquire write lock on snapshot_subscription_map");
1939 return Err(Error {
1940 code: ErrorCode::InternalError,
1941 message: "Internal error.".into(),
1942 data: None,
1943 });
1944 };
1945 Ok(true)
1946 }
1947}