surfpool_core/rpc/
ws.rs

1use std::{
2    collections::HashMap,
3    str::FromStr,
4    sync::{atomic, Arc, RwLock},
5};
6
7use jsonrpc_core::{Error, ErrorCode, Result};
8use jsonrpc_derive::rpc;
9use jsonrpc_pubsub::{
10    typed::{Sink, Subscriber},
11    SubscriptionId,
12};
13use solana_client::{
14    rpc_config::RpcSignatureSubscribeConfig,
15    rpc_response::{
16        ProcessedSignatureResult, ReceivedSignatureResult, RpcResponseContext, RpcSignatureResult,
17    },
18};
19use solana_commitment_config::CommitmentLevel;
20use solana_rpc_client_api::response::Response as RpcResponse;
21use solana_signature::Signature;
22use solana_transaction_status::TransactionConfirmationStatus;
23
24use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
25use crate::surfnet::{locker::SvmAccessContext, GetTransactionResult, SignatureSubscriptionType};
26
27#[rpc]
28pub trait Rpc {
29    type Metadata;
30
31    #[pubsub(
32        subscription = "signatureNotification",
33        subscribe,
34        name = "signatureSubscribe"
35    )]
36    fn signature_subscribe(
37        &self,
38        meta: Self::Metadata,
39        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
40        signature_str: String,
41        config: Option<RpcSignatureSubscribeConfig>,
42    );
43
44    #[pubsub(
45        subscription = "signatureNotification",
46        unsubscribe,
47        name = "signatureUnsubscribe"
48    )]
49    fn signature_unsubscribe(
50        &self,
51        meta: Option<Self::Metadata>,
52        subscription: SubscriptionId,
53    ) -> Result<bool>;
54}
55
56pub struct SurfpoolWsRpc {
57    pub uid: atomic::AtomicUsize,
58    pub active: Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
59    pub tokio_handle: tokio::runtime::Handle,
60}
61impl Rpc for SurfpoolWsRpc {
62    type Metadata = Option<SurfpoolWebsocketMeta>;
63
64    fn signature_subscribe(
65        &self,
66        meta: Self::Metadata,
67        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
68        signature_str: String,
69        config: Option<RpcSignatureSubscribeConfig>,
70    ) {
71        let signature = match Signature::from_str(&signature_str) {
72            Ok(sig) => sig,
73            Err(_) => {
74                let error = Error {
75                    code: ErrorCode::InvalidParams,
76                    message: "Invalid signature format.".into(),
77                    data: None,
78                };
79                subscriber.reject(error.clone()).unwrap();
80                return;
81            }
82        };
83        let config = config.unwrap_or_default();
84        let subscription_type = if config.enable_received_notification.unwrap_or(false) {
85            SignatureSubscriptionType::Received
86        } else {
87            SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
88        };
89
90        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
91        let sub_id = SubscriptionId::Number(id as u64);
92        let sink = subscriber
93            .assign_id(sub_id.clone())
94            .expect("Failed to assign subscription ID");
95
96        let active = Arc::clone(&self.active);
97        let meta = meta.clone();
98
99        self.tokio_handle.spawn(async move {
100            active.write().unwrap().insert(sub_id.clone(), sink);
101
102            let SurfnetRpcContext {
103                svm_locker,
104                remote_ctx,
105            } = match meta.get_rpc_context(None) {
106                Ok(res) => res,
107                Err(_) => panic!(),
108            };
109
110            // get the signature from the SVM to see if it's already been processed
111            let SvmAccessContext {
112                inner: tx_result, ..
113            } = svm_locker.get_transaction(&remote_ctx, &signature).await;
114
115            // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
116            // if so, notify the user and complete the subscription
117            // otherwise, subscribe to the transaction updates
118            if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
119                match (&subscription_type, tx.confirmation_status) {
120                    (&SignatureSubscriptionType::Received, _)
121                    | (
122                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
123                        Some(TransactionConfirmationStatus::Processed),
124                    )
125                    | (
126                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
127                        Some(TransactionConfirmationStatus::Confirmed),
128                    )
129                    | (
130                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
131                        Some(TransactionConfirmationStatus::Finalized),
132                    ) => {
133                        if let Some(sink) = active.write().unwrap().remove(&sub_id) {
134                            let _ = sink.notify(Ok(RpcResponse {
135                                context: RpcResponseContext::new(tx.slot),
136                                value: RpcSignatureResult::ProcessedSignature(
137                                    ProcessedSignatureResult { err: None },
138                                ),
139                            }));
140                        }
141                        return;
142                    }
143                    _ => {}
144                }
145            }
146
147            // update our surfnet SVM to subscribe to the signature updates
148            let rx =
149                svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
150
151            loop {
152                if let Ok((slot, some_err)) = rx.try_recv() {
153                    if let Some(sink) = active.write().unwrap().remove(&sub_id) {
154                        match subscription_type {
155                            SignatureSubscriptionType::Received => {
156                                let _ = sink.notify(Ok(RpcResponse {
157                                    context: RpcResponseContext::new(slot),
158                                    value: RpcSignatureResult::ReceivedSignature(
159                                        ReceivedSignatureResult::ReceivedSignature,
160                                    ),
161                                }));
162                            }
163                            SignatureSubscriptionType::Commitment(_) => {
164                                let _ = sink.notify(Ok(RpcResponse {
165                                    context: RpcResponseContext::new(slot),
166                                    value: RpcSignatureResult::ProcessedSignature(
167                                        ProcessedSignatureResult { err: some_err },
168                                    ),
169                                }));
170                            }
171                        }
172                    }
173                    return;
174                }
175                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
176            }
177        });
178    }
179
180    fn signature_unsubscribe(
181        &self,
182        _meta: Option<Self::Metadata>,
183        subscription: SubscriptionId,
184    ) -> Result<bool> {
185        let removed = self.active.write().unwrap().remove(&subscription);
186        if removed.is_some() {
187            Ok(true)
188        } else {
189            Err(Error {
190                code: ErrorCode::InvalidParams,
191                message: "Invalid subscription.".into(),
192                data: None,
193            })
194        }
195    }
196}