use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, RwLock, atomic},
};
use crossbeam_channel::TryRecvError;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{
SubscriptionId,
typed::{Sink, Subscriber},
};
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig, RpcTransactionLogsFilter},
rpc_filter::RpcFilterType,
rpc_response::{
ProcessedSignatureResult, ReceivedSignatureResult, RpcKeyedAccount, RpcLogsResponse,
RpcResponseContext, RpcSignatureResult,
},
};
use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_pubkey::Pubkey;
use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
use solana_signature::Signature;
use solana_transaction_status::{TransactionConfirmationStatus, UiTransactionEncoding};
use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
use crate::surfnet::{GetTransactionResult, SignatureSubscriptionType};
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcAccountSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub encoding: Option<UiAccountEncoding>,
}
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct RpcProgramSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub encoding: Option<UiAccountEncoding>,
pub filters: Option<Vec<RpcFilterType>>,
}
#[rpc]
pub trait Rpc {
type Metadata;
#[pubsub(
subscription = "signatureNotification",
subscribe,
name = "signatureSubscribe"
)]
fn signature_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
signature_str: String,
config: Option<RpcSignatureSubscribeConfig>,
);
#[pubsub(
subscription = "signatureNotification",
unsubscribe,
name = "signatureUnsubscribe"
)]
fn signature_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(
subscription = "accountNotification",
subscribe,
name = "accountSubscribe"
)]
fn account_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<UiAccount>>,
pubkey_str: String,
config: Option<RpcAccountSubscribeConfig>,
);
#[pubsub(
subscription = "accountNotification",
unsubscribe,
name = "accountUnsubscribe"
)]
fn account_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
#[pubsub(
subscription = "slotNotification",
unsubscribe,
name = "slotUnsubscribe"
)]
fn slot_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")]
fn logs_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
mentions: Option<RpcTransactionLogsFilter>,
commitment: Option<CommitmentConfig>,
);
#[pubsub(
subscription = "logsNotification",
unsubscribe,
name = "logsUnsubscribe"
)]
fn logs_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
#[pubsub(
subscription = "rootNotification",
unsubscribe,
name = "rootUnsubscribe"
)]
fn root_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(
subscription = "programNotification",
subscribe,
name = "programSubscribe"
)]
fn program_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String,
config: Option<RpcProgramSubscribeConfig>,
);
#[pubsub(
subscription = "programNotification",
unsubscribe,
name = "programUnsubscribe"
)]
fn program_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(
subscription = "slotsUpdatesNotification",
subscribe,
name = "slotsUpdatesSubscribe"
)]
fn slots_updates_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<()>>,
);
#[pubsub(
subscription = "slotsUpdatesNotification",
unsubscribe,
name = "slotsUpdatesUnsubscribe"
)]
fn slots_updates_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(subscription = "blockNotification", subscribe, name = "blockSubscribe")]
fn block_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
#[pubsub(
subscription = "blockNotification",
unsubscribe,
name = "blockUnsubscribe"
)]
fn block_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")]
fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
#[pubsub(
subscription = "voteNotification",
unsubscribe,
name = "voteUnsubscribe"
)]
fn vote_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
#[pubsub(
subscription = "snapshotNotification",
subscribe,
name = "snapshotSubscribe"
)]
fn snapshot_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<crate::surfnet::SnapshotImportNotification>,
snapshot_url: String,
);
#[pubsub(
subscription = "snapshotNotification",
unsubscribe,
name = "snapshotUnsubscribe"
)]
fn snapshot_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
}
pub struct SurfpoolWsRpc {
pub uid: atomic::AtomicUsize,
pub signature_subscription_map:
Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
pub account_subscription_map:
Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
pub program_subscription_map:
Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcKeyedAccount>>>>>,
pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
pub logs_subscription_map:
Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcLogsResponse>>>>>,
pub snapshot_subscription_map:
Arc<RwLock<HashMap<SubscriptionId, Sink<crate::surfnet::SnapshotImportNotification>>>>,
pub tokio_handle: tokio::runtime::Handle,
}
impl Rpc for SurfpoolWsRpc {
type Metadata = Option<SurfpoolWebsocketMeta>;
fn signature_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
signature_str: String,
config: Option<RpcSignatureSubscribeConfig>,
) {
let _ = meta
.as_ref()
.map(|m| m.log_debug("Websocket 'signature_subscribe' connection established"));
let signature = match Signature::from_str(&signature_str) {
Ok(sig) => sig,
Err(_) => {
let error = Error {
code: ErrorCode::InvalidParams,
message: "Invalid signature format.".into(),
data: None,
};
if let Err(e) = subscriber.reject(error.clone()) {
log::error!("Failed to reject subscriber: {:?}", e);
}
return;
}
};
let config = config.unwrap_or_default();
let rpc_transaction_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: config.commitment,
max_supported_transaction_version: Some(0),
};
let subscription_type = if config.enable_received_notification.unwrap_or(false) {
SignatureSubscriptionType::Received
} else {
SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
};
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};
let active = Arc::clone(&self.signature_subscription_map);
let meta = meta.clone();
self.tokio_handle.spawn(async move {
if let Ok(mut guard) = active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on signature_subscription_map");
return;
}
let SurfnetRpcContext {
svm_locker,
remote_ctx,
} = match meta.get_rpc_context(()) {
Ok(res) => res,
Err(e) => {
log::error!("Failed to get RPC context: {:?}", e);
if let Ok(mut guard) = active.write() {
if let Some(sink) = guard.remove(&sub_id) {
if let Err(e) = sink.notify(Err(e.into())) {
log::error!("Failed to notify client about RPC context error: {e}");
}
}
}
return;
}
};
let tx_result = match svm_locker
.get_transaction(
&remote_ctx.map(|(r, _)| r),
&signature,
rpc_transaction_config,
)
.await
{
Ok(res) => res,
Err(e) => {
if let Ok(mut guard) = active.write() {
if let Some(sink) = guard.remove(&sub_id) {
let _ = sink.notify(Err(e.into()));
}
}
return;
}
};
if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
match (&subscription_type, tx.confirmation_status) {
(&SignatureSubscriptionType::Received, _)
| (
&SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
Some(TransactionConfirmationStatus::Processed),
)
| (
&SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
Some(TransactionConfirmationStatus::Confirmed),
)
| (
&SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
Some(TransactionConfirmationStatus::Finalized),
)
| (
&SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
Some(TransactionConfirmationStatus::Confirmed),
)
| (
&SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
Some(TransactionConfirmationStatus::Finalized),
)
| (
&SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
Some(TransactionConfirmationStatus::Finalized),
) => {
if let Ok(mut guard) = active.write() {
if let Some(sink) = guard.remove(&sub_id) {
let _ = sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(tx.slot),
value: RpcSignatureResult::ProcessedSignature(
ProcessedSignatureResult {
err: tx.err.map(|e| e.into()),
},
),
}));
}
}
return;
}
_ => {}
}
}
let rx =
svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
loop {
let (slot, some_err) = match rx.try_recv() {
Ok(msg) => msg,
Err(e) => {
match e {
TryRecvError::Empty => {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
}
TryRecvError::Disconnected => {
warn!(
"Signature subscription channel closed for sub id {:?}",
sub_id
);
break;
}
}
}
};
let Ok(mut guard) = active.write() else {
log::error!("Failed to acquire read lock on signature_subscription_map");
break;
};
let Some(sink) = guard.remove(&sub_id) else {
log::error!("Failed to get sink for subscription ID");
break;
};
let res = match subscription_type {
SignatureSubscriptionType::Received => sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: RpcSignatureResult::ReceivedSignature(
ReceivedSignatureResult::ReceivedSignature,
),
})),
SignatureSubscriptionType::Commitment(_) => sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
err: some_err.map(|e| e.into()),
}),
})),
};
if guard.is_empty() {
break;
}
if let Err(e) = res {
log::error!("Failed to notify client about account update: {e}");
break;
}
}
});
}
fn signature_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
if let Ok(mut guard) = self.signature_subscription_map.write() {
guard.remove(&subscription);
} else {
log::error!("Failed to acquire write lock on signature_subscription_map");
return Err(Error {
code: ErrorCode::InternalError,
message: "Internal error.".into(),
data: None,
});
};
Ok(true)
}
fn account_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<UiAccount>>,
pubkey_str: String,
config: Option<RpcAccountSubscribeConfig>,
) {
let _ = meta
.as_ref()
.map(|m| m.log_debug("Websocket 'account_subscribe' connection established"));
let pubkey = match Pubkey::from_str(&pubkey_str) {
Ok(pk) => pk,
Err(_) => {
let error = Error {
code: ErrorCode::InvalidParams,
message: "Invalid pubkey format.".into(),
data: None,
};
if subscriber.reject(error.clone()).is_err() {
log::error!("Failed to reject subscriber for invalid pubkey format.");
}
return;
}
};
let config = config.unwrap_or(RpcAccountSubscribeConfig {
commitment: None,
encoding: None,
});
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};
let account_active = Arc::clone(&self.account_subscription_map);
let meta = meta.clone();
let svm_locker = match meta.get_svm_locker() {
Ok(locker) => locker,
Err(e) => {
log::error!("Failed to get SVM locker for account subscription: {e}");
if let Err(e) = sink.notify(Err(e.into())) {
log::error!(
"Failed to send error notification to client for SVM locker failure: {e}"
);
}
return;
}
};
let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
self.tokio_handle.spawn(async move {
if let Ok(mut guard) = account_active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on account_subscription_map");
return;
}
let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
loop {
if let Ok(guard) = account_active.read() {
if guard.get(&sub_id).is_none() {
break;
}
} else {
log::error!("Failed to acquire read lock on account_subscription_map");
break;
}
let Ok(ui_account) = rx.try_recv() else {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
};
let Ok(guard) = account_active.read() else {
log::error!("Failed to acquire read lock on account_subscription_map");
break;
};
let Some(sink) = guard.get(&sub_id) else {
log::error!("Failed to get sink for subscription ID");
break;
};
if let Err(e) = sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: ui_account,
})) {
log::error!("Failed to notify client about account update: {e}");
break;
}
}
});
}
fn account_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
if let Ok(mut guard) = self.account_subscription_map.write() {
guard.remove(&subscription)
} else {
log::error!("Failed to acquire write lock on account_subscription_map");
return Err(Error {
code: ErrorCode::InternalError,
message: "Internal error.".into(),
data: None,
});
};
Ok(true)
}
fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
let _ = meta
.as_ref()
.map(|m| m.log_debug("Websocket 'slot_subscribe' connection established"));
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};
let slot_active = Arc::clone(&self.slot_subscription_map);
let meta = meta.clone();
let svm_locker = match meta.get_svm_locker() {
Ok(locker) => locker,
Err(e) => {
log::error!("Failed to get SVM locker for slot subscription: {e}");
if let Err(e) = sink.notify(Err(e.into())) {
log::error!(
"Failed to send error notification to client for SVM locker failure: {e}"
);
}
return;
}
};
self.tokio_handle.spawn(async move {
if let Ok(mut guard) = slot_active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on slot_subscription_map");
return;
}
let rx = svm_locker.subscribe_for_slot_updates();
loop {
if let Ok(guard) = slot_active.read() {
if guard.get(&sub_id).is_none() {
break;
}
} else {
log::error!("Failed to acquire read lock on slot_subscription_map");
break;
}
let Ok(slot_info) = rx.try_recv() else {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
};
let Ok(guard) = slot_active.read() else {
log::error!("Failed to acquire read lock on slots_subscription_map");
break;
};
let Some(sink) = guard.get(&sub_id) else {
log::error!("Failed to get sink for subscription ID");
break;
};
if let Err(e) = sink.notify(Ok(slot_info)) {
log::error!("Failed to notify client about slots update: {e}");
break;
}
}
});
}
fn slot_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
if let Ok(mut guard) = self.slot_subscription_map.write() {
guard.remove(&subscription)
} else {
log::error!("Failed to acquire write lock on slot_subscription_map");
return Err(Error {
code: ErrorCode::InternalError,
message: "Internal error.".into(),
data: None,
});
};
Ok(true)
}
fn logs_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
mentions: Option<RpcTransactionLogsFilter>,
commitment: Option<CommitmentConfig>,
) {
let _ = meta
.as_ref()
.map(|m| m.log_debug("Websocket 'logs_subscribe' connection established"));
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};
let mentions = mentions.unwrap_or(RpcTransactionLogsFilter::All);
let commitment = commitment.unwrap_or_default().commitment;
let logs_active = Arc::clone(&self.logs_subscription_map);
let meta = meta.clone();
let svm_locker = match meta.get_svm_locker() {
Ok(locker) => locker,
Err(e) => {
log::error!("Failed to get SVM locker for slot subscription: {e}");
if let Err(e) = sink.notify(Err(e.into())) {
log::error!(
"Failed to send error notification to client for SVM locker failure: {e}"
);
}
return;
}
};
self.tokio_handle.spawn(async move {
if let Ok(mut guard) = logs_active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on slot_subscription_map");
return;
}
let rx = svm_locker.subscribe_for_logs_updates(&commitment, &mentions);
loop {
if let Ok(guard) = logs_active.read() {
if guard.get(&sub_id).is_none() {
break;
}
} else {
log::error!("Failed to acquire read lock on slot_subscription_map");
break;
}
let Ok((slot, value)) = rx.try_recv() else {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
};
let Ok(guard) = logs_active.read() else {
log::error!("Failed to acquire read lock on logs_subscription_map");
break;
};
let Some(sink) = guard.get(&sub_id) else {
log::error!("Failed to get sink for subscription ID");
break;
};
if let Err(e) = sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value,
})) {
log::error!("Failed to notify client about logs update: {e}");
break;
}
}
});
}
fn logs_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
if let Ok(mut guard) = self.logs_subscription_map.write() {
guard.remove(&subscription);
} else {
log::error!("Failed to acquire write lock on logs_subscription_map");
return Err(Error {
code: ErrorCode::InternalError,
message: "Internal error.".into(),
data: None,
});
};
Ok(true)
}
fn root_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
let _ = meta
.as_ref()
.map(|m| m.log_warn("Websocket method 'root_subscribe' is uninmplemented"));
}
fn root_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
_subscription: SubscriptionId,
) -> Result<bool> {
Ok(true)
}
fn program_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String,
config: Option<RpcProgramSubscribeConfig>,
) {
let _ = meta
.as_ref()
.map(|m| m.log_debug("Websocket 'program_subscribe' connection established"));
let program_id = match Pubkey::from_str(&pubkey_str) {
Ok(pk) => pk,
Err(_) => {
let error = Error {
code: ErrorCode::InvalidParams,
message: "Invalid pubkey format.".into(),
data: None,
};
if subscriber.reject(error.clone()).is_err() {
log::error!("Failed to reject subscriber for invalid pubkey format.");
}
return;
}
};
let config = config.unwrap_or_default();
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};
let program_active = Arc::clone(&self.program_subscription_map);
let meta = meta.clone();
let svm_locker = match meta.get_svm_locker() {
Ok(locker) => locker,
Err(e) => {
log::error!("Failed to get SVM locker for program subscription: {e}");
if let Err(e) = sink.notify(Err(e.into())) {
log::error!(
"Failed to send error notification to client for SVM locker failure: {e}"
);
}
return;
}
};
let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
self.tokio_handle.spawn(async move {
if let Ok(mut guard) = program_active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on program_subscription_map");
return;
}
let rx = svm_locker.subscribe_for_program_updates(
&program_id,
config.encoding,
config.filters,
);
loop {
if let Ok(guard) = program_active.read() {
if guard.get(&sub_id).is_none() {
break;
}
} else {
log::error!("Failed to acquire read lock on program_subscription_map");
break;
}
let Ok(keyed_account) = rx.try_recv() else {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
};
let Ok(guard) = program_active.read() else {
log::error!("Failed to acquire read lock on program_subscription_map");
break;
};
let Some(sink) = guard.get(&sub_id) else {
log::error!("Failed to get sink for subscription ID");
break;
};
if let Err(e) = sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: keyed_account,
})) {
log::error!("Failed to notify client about program account update: {e}");
break;
}
}
});
}
fn program_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
if let Ok(mut guard) = self.program_subscription_map.write() {
guard.remove(&subscription);
} else {
log::error!("Failed to acquire write lock on program_subscription_map");
return Err(Error {
code: ErrorCode::InternalError,
message: "Internal error.".into(),
data: None,
});
};
Ok(true)
}
fn slots_updates_subscribe(
&self,
meta: Self::Metadata,
_subscriber: Subscriber<RpcResponse<()>>,
) {
let _ = meta
.as_ref()
.map(|m| m.log_warn("Websocket method 'slots_updates_subscribe' is uninmplemented"));
}
fn slots_updates_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
_subscription: SubscriptionId,
) -> Result<bool> {
Ok(true)
}
fn block_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
let _ = meta
.as_ref()
.map(|m| m.log_warn("Websocket method 'block_subscribe' is uninmplemented"));
}
fn block_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
_subscription: SubscriptionId,
) -> Result<bool> {
Ok(true)
}
fn vote_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
let _ = meta
.as_ref()
.map(|m| m.log_warn("Websocket method 'vote_subscribe' is uninmplemented"));
}
fn vote_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
_subscription: SubscriptionId,
) -> Result<bool> {
Ok(true)
}
fn snapshot_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<crate::surfnet::SnapshotImportNotification>,
snapshot_url: String,
) {
let _ = meta
.as_ref()
.map(|m| m.log_debug("Websocket 'snapshot_subscribe' connection established"));
if snapshot_url.is_empty() {
let error = Error {
code: ErrorCode::InvalidParams,
message: "Invalid snapshot URL: URL cannot be empty.".into(),
data: None,
};
if let Err(e) = subscriber.reject(error.clone()) {
log::error!("Failed to reject subscriber: {:?}", e);
}
return;
}
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};
let snapshot_active = Arc::clone(&self.snapshot_subscription_map);
let meta = meta.clone();
let svm_locker = match meta.get_svm_locker() {
Ok(locker) => locker,
Err(e) => {
log::error!("Failed to get SVM locker for snapshot subscription: {e}");
if let Err(e) = sink.notify(Err(e.into())) {
log::error!(
"Failed to send error notification to client for SVM locker failure: {e}"
);
}
return;
}
};
self.tokio_handle.spawn(async move {
if let Ok(mut guard) = snapshot_active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on snapshot_subscription_map");
return;
}
let snapshot_id = format!(
"snapshot_{}",
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
);
let rx = svm_locker.subscribe_for_snapshot_import_updates(&snapshot_url, &snapshot_id);
loop {
if let Ok(guard) = snapshot_active.read() {
if guard.get(&sub_id).is_none() {
break;
}
} else {
log::error!("Failed to acquire read lock on snapshot_subscription_map");
break;
}
let Ok(notification) = rx.try_recv() else {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
};
let Ok(guard) = snapshot_active.read() else {
log::error!("Failed to acquire read lock on snapshot_subscription_map");
break;
};
let Some(sink) = guard.get(&sub_id) else {
log::error!("Failed to get sink for subscription ID");
break;
};
if let Err(e) = sink.notify(Ok(notification)) {
log::error!("Failed to notify client about snapshot import update: {e}");
break;
}
if let Ok(guard) = snapshot_active.read() {
if let Some(_sink) = guard.get(&sub_id) {
}
}
}
});
}
fn snapshot_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
if let Ok(mut guard) = self.snapshot_subscription_map.write() {
guard.remove(&subscription);
} else {
log::error!("Failed to acquire write lock on snapshot_subscription_map");
return Err(Error {
code: ErrorCode::InternalError,
message: "Internal error.".into(),
data: None,
});
};
Ok(true)
}
}