1use std::{
2 collections::HashMap,
3 str::FromStr,
4 sync::{atomic, Arc, RwLock},
5};
6
7use jsonrpc_core::{Error, ErrorCode, Result};
8use jsonrpc_derive::rpc;
9use jsonrpc_pubsub::{
10 typed::{Sink, Subscriber},
11 SubscriptionId,
12};
13use solana_client::{
14 rpc_config::RpcSignatureSubscribeConfig,
15 rpc_response::{
16 ProcessedSignatureResult, ReceivedSignatureResult, RpcResponseContext, RpcSignatureResult,
17 },
18};
19use solana_commitment_config::CommitmentLevel;
20use solana_rpc_client_api::response::Response as RpcResponse;
21use solana_signature::Signature;
22use solana_transaction_status::TransactionConfirmationStatus;
23
24use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
25use crate::surfnet::{locker::SvmAccessContext, GetTransactionResult, SignatureSubscriptionType};
26
27#[rpc]
28pub trait Rpc {
29 type Metadata;
30
31 #[pubsub(
32 subscription = "signatureNotification",
33 subscribe,
34 name = "signatureSubscribe"
35 )]
36 fn signature_subscribe(
37 &self,
38 meta: Self::Metadata,
39 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
40 signature_str: String,
41 config: Option<RpcSignatureSubscribeConfig>,
42 );
43
44 #[pubsub(
45 subscription = "signatureNotification",
46 unsubscribe,
47 name = "signatureUnsubscribe"
48 )]
49 fn signature_unsubscribe(
50 &self,
51 meta: Option<Self::Metadata>,
52 subscription: SubscriptionId,
53 ) -> Result<bool>;
54}
55
56pub struct SurfpoolWsRpc {
57 pub uid: atomic::AtomicUsize,
58 pub active: Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
59 pub tokio_handle: tokio::runtime::Handle,
60}
61impl Rpc for SurfpoolWsRpc {
62 type Metadata = Option<SurfpoolWebsocketMeta>;
63
64 fn signature_subscribe(
65 &self,
66 meta: Self::Metadata,
67 subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
68 signature_str: String,
69 config: Option<RpcSignatureSubscribeConfig>,
70 ) {
71 let signature = match Signature::from_str(&signature_str) {
72 Ok(sig) => sig,
73 Err(_) => {
74 let error = Error {
75 code: ErrorCode::InvalidParams,
76 message: "Invalid signature format.".into(),
77 data: None,
78 };
79 subscriber.reject(error.clone()).unwrap();
80 return;
81 }
82 };
83 let config = config.unwrap_or_default();
84 let subscription_type = if config.enable_received_notification.unwrap_or(false) {
85 SignatureSubscriptionType::Received
86 } else {
87 SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
88 };
89
90 let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
91 let sub_id = SubscriptionId::Number(id as u64);
92 let sink = subscriber
93 .assign_id(sub_id.clone())
94 .expect("Failed to assign subscription ID");
95
96 let active = Arc::clone(&self.active);
97 let meta = meta.clone();
98
99 self.tokio_handle.spawn(async move {
100 active.write().unwrap().insert(sub_id.clone(), sink);
101
102 let SurfnetRpcContext {
103 svm_locker,
104 remote_ctx,
105 } = match meta.get_rpc_context(None) {
106 Ok(res) => res,
107 Err(_) => panic!(),
108 };
109
110 let SvmAccessContext {
112 inner: tx_result, ..
113 } = svm_locker.get_transaction(&remote_ctx, &signature).await;
114
115 if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
119 match (&subscription_type, tx.confirmation_status) {
120 (&SignatureSubscriptionType::Received, _)
121 | (
122 &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
123 Some(TransactionConfirmationStatus::Processed),
124 )
125 | (
126 &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
127 Some(TransactionConfirmationStatus::Confirmed),
128 )
129 | (
130 &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
131 Some(TransactionConfirmationStatus::Finalized),
132 ) => {
133 if let Some(sink) = active.write().unwrap().remove(&sub_id) {
134 let _ = sink.notify(Ok(RpcResponse {
135 context: RpcResponseContext::new(tx.slot),
136 value: RpcSignatureResult::ProcessedSignature(
137 ProcessedSignatureResult { err: None },
138 ),
139 }));
140 }
141 return;
142 }
143 _ => {}
144 }
145 }
146
147 let rx =
149 svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
150
151 loop {
152 if let Ok((slot, some_err)) = rx.try_recv() {
153 if let Some(sink) = active.write().unwrap().remove(&sub_id) {
154 match subscription_type {
155 SignatureSubscriptionType::Received => {
156 let _ = sink.notify(Ok(RpcResponse {
157 context: RpcResponseContext::new(slot),
158 value: RpcSignatureResult::ReceivedSignature(
159 ReceivedSignatureResult::ReceivedSignature,
160 ),
161 }));
162 }
163 SignatureSubscriptionType::Commitment(_) => {
164 let _ = sink.notify(Ok(RpcResponse {
165 context: RpcResponseContext::new(slot),
166 value: RpcSignatureResult::ProcessedSignature(
167 ProcessedSignatureResult { err: some_err },
168 ),
169 }));
170 }
171 }
172 }
173 return;
174 }
175 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
176 }
177 });
178 }
179
180 fn signature_unsubscribe(
181 &self,
182 _meta: Option<Self::Metadata>,
183 subscription: SubscriptionId,
184 ) -> Result<bool> {
185 let removed = self.active.write().unwrap().remove(&subscription);
186 if removed.is_some() {
187 Ok(true)
188 } else {
189 Err(Error {
190 code: ErrorCode::InvalidParams,
191 message: "Invalid subscription.".into(),
192 data: None,
193 })
194 }
195 }
196}