Skip to main content

ethrex_rpc/
subscription_manager.rs

1//! Actor-based subscription manager for WebSocket `eth_subscribe` connections.
2//!
3//! The `SubscriptionManager` is a GenServer actor that owns all subscription
4//! state. It receives `NewHead` messages from block producers / fork choice
5//! handlers and fans out notifications to all connected WebSocket clients
6//! through per-connection `mpsc` channels.
7//!
8//! Using an actor removes the need for a `broadcast` channel and eliminates
9//! the "lagged subscriber" problem: when a connection drops, its sender is
10//! removed during the next `new_head` fan-out rather than silently accumulating
11//! unread messages.
12
13use ethrex_common::types::BlockHeader;
14use rand::RngCore;
15use serde_json::Value;
16use spawned_concurrency::{
17    actor,
18    error::ActorError,
19    protocol,
20    tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, Response},
21};
22use std::collections::HashMap;
23use tokio::sync::mpsc::Sender;
24use tracing::{debug, warn};
25
26/// Maximum number of buffered notifications per subscriber.
27/// If a subscriber's channel is full (slow WebSocket client), the notification
28/// is dropped rather than blocking the actor. Matches Geth's approach of
29/// dropping slow clients (Geth uses 20,000; we use a smaller buffer since
30/// each notification is already serialized JSON).
31pub const SUBSCRIBER_CHANNEL_CAPACITY: usize = 512;
32
33/// Maximum number of active subscriptions allowed per WebSocket connection.
34pub const MAX_SUBSCRIPTIONS_PER_CONNECTION: usize = 128;
35
36/// Maximum number of active subscriptions across all connections.
37///
38/// Bounds the worst-case memory of the actor: each subscriber owns a
39/// [`SUBSCRIBER_CHANNEL_CAPACITY`]-slot `mpsc` channel, so the per-connection
40/// cap alone (`128`) does not bound total state when many connections are
41/// open. `subscribe` returns `None` once this is reached and the calling
42/// `eth_subscribe` request fails with an internal error.
43pub const MAX_TOTAL_SUBSCRIPTIONS: usize = 10_000;
44
45/// Actor that manages all active WebSocket subscriptions.
46///
47/// Each subscription is identified by a hex-encoded string ID and backed by a
48/// bounded `Sender<String>` that delivers serialised notification JSON to the
49/// corresponding WebSocket write-loop.
50#[derive(Default)]
51pub struct SubscriptionManager {
52    subscribers: HashMap<String, Sender<String>>,
53}
54
55/// Messages understood by the [`SubscriptionManager`].
56#[protocol]
57pub trait SubscriptionManagerProtocol: Send + Sync {
58    /// Broadcast a new block header to all `newHeads` subscribers.
59    ///
60    /// The actor handles serialization and hash injection. Callers just
61    /// pass the raw `BlockHeader`. Dead subscribers are removed automatically
62    /// when their channel is closed.
63    fn new_head(&self, header: BlockHeader) -> Result<(), ActorError>;
64
65    /// Register a new subscriber.
66    ///
67    /// Returns `Some(id)` with the subscription ID that the client should use
68    /// in subsequent `eth_unsubscribe` calls, or `None` if the global cap
69    /// [`MAX_TOTAL_SUBSCRIPTIONS`] has been reached.
70    fn subscribe(&self, sender: Sender<String>) -> Response<Option<String>>;
71
72    /// Remove a subscriber by ID.
73    ///
74    /// Returns `true` if the subscription existed and was removed, `false`
75    /// otherwise.
76    fn unsubscribe(&self, id: String) -> Response<bool>;
77}
78
79#[actor(protocol = SubscriptionManagerProtocol)]
80impl SubscriptionManager {
81    /// Spawn the actor and return a handle.
82    pub fn spawn() -> ActorRef<SubscriptionManager> {
83        SubscriptionManager::default().start()
84    }
85
86    #[send_handler]
87    async fn handle_new_head(
88        &mut self,
89        msg: subscription_manager_protocol::NewHead,
90        _ctx: &Context<Self>,
91    ) {
92        if self.subscribers.is_empty() {
93            return;
94        }
95
96        // Serialize the header and inject the computed block hash.
97        let header = msg.header;
98        let block_hash = header.hash();
99        let mut header_value = match serde_json::to_value(&header) {
100            Ok(v) => v,
101            Err(e) => {
102                warn!("Failed to serialize block header for newHeads: {e}");
103                return;
104            }
105        };
106        if let Value::Object(ref mut map) = header_value {
107            map.insert(
108                "hash".to_string(),
109                Value::String(format!("{block_hash:#x}")),
110            );
111        }
112
113        let mut dead_ids: Vec<String> = Vec::new();
114
115        for (sub_id, sender) in &self.subscribers {
116            let notification = build_subscription_notification(sub_id, &header_value);
117            match sender.try_send(notification) {
118                Ok(()) => {}
119                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
120                    dead_ids.push(sub_id.clone());
121                }
122                Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
123                    warn!(sub_id = %sub_id, "Subscriber channel full, dropping notification");
124                }
125            }
126        }
127
128        for id in dead_ids {
129            debug!(sub_id = %id, "Removing closed newHeads subscriber");
130            self.subscribers.remove(&id);
131        }
132    }
133
134    #[request_handler]
135    async fn handle_subscribe(
136        &mut self,
137        msg: subscription_manager_protocol::Subscribe,
138        _ctx: &Context<Self>,
139    ) -> Option<String> {
140        if self.subscribers.len() >= MAX_TOTAL_SUBSCRIPTIONS {
141            warn!(
142                cap = MAX_TOTAL_SUBSCRIPTIONS,
143                "Global subscription cap reached, refusing new subscriber"
144            );
145            return None;
146        }
147        let id = generate_subscription_id();
148        self.subscribers.insert(id.clone(), msg.sender);
149        Some(id)
150    }
151
152    #[request_handler]
153    async fn handle_unsubscribe(
154        &mut self,
155        msg: subscription_manager_protocol::Unsubscribe,
156        _ctx: &Context<Self>,
157    ) -> bool {
158        self.subscribers.remove(&msg.id).is_some()
159    }
160}
161
162/// Build the standard Ethereum subscription notification envelope.
163///
164/// `result` is cloned per subscriber — cheap relative to re-serializing the
165/// header. Using `serde_json::json!` avoids hand-rolled string interpolation,
166/// which would silently produce malformed JSON if `sub_id` or the result ever
167/// contained unescaped characters.
168fn build_subscription_notification(sub_id: &str, result: &Value) -> String {
169    serde_json::json!({
170        "jsonrpc": "2.0",
171        "method": "eth_subscription",
172        "params": {
173            "subscription": sub_id,
174            "result": result,
175        },
176    })
177    .to_string()
178}
179
180/// Generate a random hex subscription ID (16 bytes / 128 bits).
181fn generate_subscription_id() -> String {
182    let mut bytes = [0u8; 16];
183    rand::thread_rng().fill_bytes(&mut bytes);
184    format!("0x{}", hex::encode(bytes))
185}