ethrex_rpc/subscription_manager.rs
1//! Actor-based subscription manager for WebSocket `eth_subscribe` connections.
2//!
3//! The `SubscriptionManager` is a GenServer actor that owns all subscription
4//! state. It receives `NewHead` messages from block producers / fork choice
5//! handlers and fans out notifications to all connected WebSocket clients
6//! through per-connection `mpsc` channels.
7//!
8//! Using an actor removes the need for a `broadcast` channel and eliminates
9//! the "lagged subscriber" problem: when a connection drops, its sender is
10//! removed during the next `new_head` fan-out rather than silently accumulating
11//! unread messages.
12
13use ethrex_common::types::BlockHeader;
14use rand::RngCore;
15use serde_json::Value;
16use spawned_concurrency::{
17 actor,
18 error::ActorError,
19 protocol,
20 tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, Response},
21};
22use std::collections::HashMap;
23use tokio::sync::mpsc::Sender;
24use tracing::{debug, warn};
25
26/// Maximum number of buffered notifications per subscriber.
27/// If a subscriber's channel is full (slow WebSocket client), the notification
28/// is dropped rather than blocking the actor. Matches Geth's approach of
29/// dropping slow clients (Geth uses 20,000; we use a smaller buffer since
30/// each notification is already serialized JSON).
31pub const SUBSCRIBER_CHANNEL_CAPACITY: usize = 512;
32
33/// Maximum number of active subscriptions allowed per WebSocket connection.
34pub const MAX_SUBSCRIPTIONS_PER_CONNECTION: usize = 128;
35
36/// Maximum number of active subscriptions across all connections.
37///
38/// Bounds the worst-case memory of the actor: each subscriber owns a
39/// [`SUBSCRIBER_CHANNEL_CAPACITY`]-slot `mpsc` channel, so the per-connection
40/// cap alone (`128`) does not bound total state when many connections are
41/// open. `subscribe` returns `None` once this is reached and the calling
42/// `eth_subscribe` request fails with an internal error.
43pub const MAX_TOTAL_SUBSCRIPTIONS: usize = 10_000;
44
45/// Actor that manages all active WebSocket subscriptions.
46///
47/// Each subscription is identified by a hex-encoded string ID and backed by a
48/// bounded `Sender<String>` that delivers serialised notification JSON to the
49/// corresponding WebSocket write-loop.
50#[derive(Default)]
51pub struct SubscriptionManager {
52 subscribers: HashMap<String, Sender<String>>,
53}
54
55/// Messages understood by the [`SubscriptionManager`].
56#[protocol]
57pub trait SubscriptionManagerProtocol: Send + Sync {
58 /// Broadcast a new block header to all `newHeads` subscribers.
59 ///
60 /// The actor handles serialization and hash injection. Callers just
61 /// pass the raw `BlockHeader`. Dead subscribers are removed automatically
62 /// when their channel is closed.
63 fn new_head(&self, header: BlockHeader) -> Result<(), ActorError>;
64
65 /// Register a new subscriber.
66 ///
67 /// Returns `Some(id)` with the subscription ID that the client should use
68 /// in subsequent `eth_unsubscribe` calls, or `None` if the global cap
69 /// [`MAX_TOTAL_SUBSCRIPTIONS`] has been reached.
70 fn subscribe(&self, sender: Sender<String>) -> Response<Option<String>>;
71
72 /// Remove a subscriber by ID.
73 ///
74 /// Returns `true` if the subscription existed and was removed, `false`
75 /// otherwise.
76 fn unsubscribe(&self, id: String) -> Response<bool>;
77}
78
79#[actor(protocol = SubscriptionManagerProtocol)]
80impl SubscriptionManager {
81 /// Spawn the actor and return a handle.
82 pub fn spawn() -> ActorRef<SubscriptionManager> {
83 SubscriptionManager::default().start()
84 }
85
86 #[send_handler]
87 async fn handle_new_head(
88 &mut self,
89 msg: subscription_manager_protocol::NewHead,
90 _ctx: &Context<Self>,
91 ) {
92 if self.subscribers.is_empty() {
93 return;
94 }
95
96 // Serialize the header and inject the computed block hash.
97 let header = msg.header;
98 let block_hash = header.hash();
99 let mut header_value = match serde_json::to_value(&header) {
100 Ok(v) => v,
101 Err(e) => {
102 warn!("Failed to serialize block header for newHeads: {e}");
103 return;
104 }
105 };
106 if let Value::Object(ref mut map) = header_value {
107 map.insert(
108 "hash".to_string(),
109 Value::String(format!("{block_hash:#x}")),
110 );
111 }
112
113 let mut dead_ids: Vec<String> = Vec::new();
114
115 for (sub_id, sender) in &self.subscribers {
116 let notification = build_subscription_notification(sub_id, &header_value);
117 match sender.try_send(notification) {
118 Ok(()) => {}
119 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
120 dead_ids.push(sub_id.clone());
121 }
122 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
123 warn!(sub_id = %sub_id, "Subscriber channel full, dropping notification");
124 }
125 }
126 }
127
128 for id in dead_ids {
129 debug!(sub_id = %id, "Removing closed newHeads subscriber");
130 self.subscribers.remove(&id);
131 }
132 }
133
134 #[request_handler]
135 async fn handle_subscribe(
136 &mut self,
137 msg: subscription_manager_protocol::Subscribe,
138 _ctx: &Context<Self>,
139 ) -> Option<String> {
140 if self.subscribers.len() >= MAX_TOTAL_SUBSCRIPTIONS {
141 warn!(
142 cap = MAX_TOTAL_SUBSCRIPTIONS,
143 "Global subscription cap reached, refusing new subscriber"
144 );
145 return None;
146 }
147 let id = generate_subscription_id();
148 self.subscribers.insert(id.clone(), msg.sender);
149 Some(id)
150 }
151
152 #[request_handler]
153 async fn handle_unsubscribe(
154 &mut self,
155 msg: subscription_manager_protocol::Unsubscribe,
156 _ctx: &Context<Self>,
157 ) -> bool {
158 self.subscribers.remove(&msg.id).is_some()
159 }
160}
161
162/// Build the standard Ethereum subscription notification envelope.
163///
164/// `result` is cloned per subscriber — cheap relative to re-serializing the
165/// header. Using `serde_json::json!` avoids hand-rolled string interpolation,
166/// which would silently produce malformed JSON if `sub_id` or the result ever
167/// contained unescaped characters.
168fn build_subscription_notification(sub_id: &str, result: &Value) -> String {
169 serde_json::json!({
170 "jsonrpc": "2.0",
171 "method": "eth_subscription",
172 "params": {
173 "subscription": sub_id,
174 "result": result,
175 },
176 })
177 .to_string()
178}
179
180/// Generate a random hex subscription ID (16 bytes / 128 bits).
181fn generate_subscription_id() -> String {
182 let mut bytes = [0u8; 16];
183 rand::thread_rng().fill_bytes(&mut bytes);
184 format!("0x{}", hex::encode(bytes))
185}