Skip to main content

alloy_rpc_types_eth/
pubsub.rs

1//! Ethereum types for pub-sub
2
3use crate::{Filter, Header, Log, Transaction, TransactionReceipt};
4use alloc::{boxed::Box, vec::Vec};
5use alloy_primitives::B256;
6use alloy_serde::WithOtherFields;
7
8/// Subscription result.
9#[derive(Clone, Debug, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
11#[cfg_attr(feature = "serde", serde(untagged))]
12pub enum SubscriptionResult<T = Transaction, R = TransactionReceipt> {
13    /// New block header.
14    Header(Box<WithOtherFields<Header>>),
15    /// Log
16    Log(Box<Log>),
17    /// Transaction hash
18    TransactionHash(B256),
19    /// Full Transaction
20    FullTransaction(Box<T>),
21    /// SyncStatus
22    SyncState(PubSubSyncStatus),
23    /// Transaction Receipts
24    TransactionReceipts(Vec<R>),
25}
26
27/// Response type for a SyncStatus subscription.
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
30#[cfg_attr(feature = "serde", serde(untagged))]
31pub enum PubSubSyncStatus {
32    /// If not currently syncing, this should always be `false`.
33    Simple(bool),
34    /// Syncing metadata.
35    Detailed(SyncStatusMetadata),
36}
37
38/// Sync status metadata.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
42pub struct SyncStatusMetadata {
43    /// Whether the node is currently syncing.
44    pub syncing: bool,
45    /// The starting block.
46    #[cfg_attr(feature = "serde", serde(with = "alloy_serde::quantity"))]
47    pub starting_block: u64,
48    /// The current block.
49    #[cfg_attr(feature = "serde", serde(with = "alloy_serde::quantity"))]
50    pub current_block: u64,
51    /// The highest block.
52    #[cfg_attr(
53        feature = "serde",
54        serde(
55            default,
56            skip_serializing_if = "Option::is_none",
57            with = "alloy_serde::quantity::opt"
58        )
59    )]
60    pub highest_block: Option<u64>,
61}
62
63#[cfg(feature = "serde")]
64impl<T, R> serde::Serialize for SubscriptionResult<T, R>
65where
66    T: serde::Serialize,
67    R: serde::Serialize,
68{
69    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
70    where
71        S: serde::Serializer,
72    {
73        match *self {
74            Self::Header(ref header) => header.serialize(serializer),
75            Self::Log(ref log) => log.serialize(serializer),
76            Self::TransactionHash(ref hash) => hash.serialize(serializer),
77            Self::FullTransaction(ref tx) => tx.serialize(serializer),
78            Self::SyncState(ref sync) => sync.serialize(serializer),
79            Self::TransactionReceipts(ref receipts) => receipts.serialize(serializer),
80        }
81    }
82}
83
84/// Subscription kind.
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
86#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
87#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
88pub enum SubscriptionKind {
89    /// New block headers subscription.
90    ///
91    /// Fires a notification each time a new header is appended to the chain, including chain
92    /// reorganizations. In case of a chain reorganization the subscription will emit all new
93    /// headers for the new chain. Therefore the subscription can emit multiple headers on the same
94    /// height.
95    NewHeads,
96    /// Logs subscription.
97    ///
98    /// Returns logs that are included in new imported blocks and match the given filter criteria.
99    /// In case of a chain reorganization previous sent logs that are on the old chain will be
100    /// resent with the removed property set to true. Logs from transactions that ended up in the
101    /// new chain are emitted. Therefore, a subscription can emit logs for the same transaction
102    /// multiple times.
103    Logs,
104    /// New Pending Transactions subscription.
105    ///
106    /// Returns the hash or full tx for all transactions that are added to the pending state and
107    /// are signed with a key that is available in the node. When a transaction that was
108    /// previously part of the canonical chain isn't part of the new canonical chain after a
109    /// reorganization its again emitted.
110    NewPendingTransactions,
111    /// Node syncing status subscription.
112    ///
113    /// Indicates when the node starts or stops synchronizing. The result can either be a boolean
114    /// indicating that the synchronization has started (true), finished (false) or an object with
115    /// various progress indicators.
116    Syncing,
117    /// New transaction receipts subscription.
118    ///
119    /// Returns transaction receipts that are included in new imported blocks and match the given
120    /// filter criteria. In case of a chain reorganization the subscription will emit transaction
121    /// receipts for the new chain if they match the filter criteria. Therefore, the subscription
122    /// can emit same transaction receipts multiple times.
123    TransactionReceipts,
124}
125
126/// Parameters for transaction receipts subscription.
127///
128/// # Example
129///
130/// Subscribe to specific transaction receipts:
131///
132/// ```json
133/// {
134///   "transactionHashes": [
135///     "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"
136///   ]
137/// }
138/// ```
139///
140/// Subscribe to all transaction receipts (no filter):
141///
142/// ```json
143/// {
144///   "transactionHashes": null
145/// }
146/// ```
147///
148/// ```json
149/// {
150///   "transactionHashes": []
151/// }
152/// ```
153#[derive(Clone, Debug, Default, PartialEq, Eq)]
154#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
155#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
156pub struct TransactionReceiptsParams {
157    /// Optional list of transaction hashes to filter by.
158    ///
159    /// If not provided or empty, all transaction receipts will be returned.
160    #[cfg_attr(feature = "serde", serde(default))]
161    pub transaction_hashes: Option<Vec<B256>>,
162}
163
164impl core::fmt::Display for SubscriptionKind {
165    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
166        match self {
167            Self::NewHeads => write!(f, "newHeads"),
168            Self::Logs => write!(f, "logs"),
169            Self::NewPendingTransactions => write!(f, "newPendingTransactions"),
170            Self::Syncing => write!(f, "syncing"),
171            Self::TransactionReceipts => write!(f, "transactionReceipts"),
172        }
173    }
174}
175
176impl core::str::FromStr for SubscriptionKind {
177    type Err = &'static str;
178
179    fn from_str(s: &str) -> Result<Self, Self::Err> {
180        match s {
181            "newHeads" => Ok(Self::NewHeads),
182            "logs" => Ok(Self::Logs),
183            "newPendingTransactions" => Ok(Self::NewPendingTransactions),
184            "syncing" => Ok(Self::Syncing),
185            "transactionReceipts" => Ok(Self::TransactionReceipts),
186            _ => Err("invalid subscription kind"),
187        }
188    }
189}
190
191/// Any additional parameters for a subscription.
192#[derive(Clone, Debug, Default, PartialEq, Eq)]
193pub enum Params {
194    /// No parameters passed.
195    #[default]
196    None,
197    /// Log parameters.
198    Logs(Box<Filter>),
199    /// Boolean parameter for new pending transactions.
200    Bool(bool),
201    /// Transaction receipts parameters.
202    TransactionReceipts(TransactionReceiptsParams),
203}
204
205impl Params {
206    /// Returns true if it's a bool parameter.
207    #[inline]
208    pub const fn is_bool(&self) -> bool {
209        matches!(self, Self::Bool(_))
210    }
211
212    /// Returns true if it's a log parameter.
213    #[inline]
214    pub const fn is_logs(&self) -> bool {
215        matches!(self, Self::Logs(_))
216    }
217
218    /// Creates a new [`Params`] from a [`serde_json::Value`].
219    #[cfg(feature = "serde")]
220    pub fn from_json_value(v: serde_json::Value) -> Result<Self, serde_json::Error> {
221        if v.is_null() {
222            return Ok(Self::None);
223        }
224
225        if let Some(val) = v.as_bool() {
226            return Ok(val.into());
227        }
228
229        let is_transaction_receipts =
230            v.as_object().is_some_and(|obj| obj.contains_key("transactionHashes"));
231        if is_transaction_receipts {
232            return serde_json::from_value::<TransactionReceiptsParams>(v).map(Into::into);
233        }
234
235        serde_json::from_value::<Filter>(v).map(Into::into)
236    }
237}
238
239impl From<Filter> for Params {
240    fn from(filter: Filter) -> Self {
241        Self::Logs(Box::new(filter))
242    }
243}
244
245impl From<bool> for Params {
246    fn from(value: bool) -> Self {
247        Self::Bool(value)
248    }
249}
250
251impl From<TransactionReceiptsParams> for Params {
252    fn from(params: TransactionReceiptsParams) -> Self {
253        Self::TransactionReceipts(params)
254    }
255}
256
257#[cfg(feature = "serde")]
258impl serde::Serialize for Params {
259    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
260    where
261        S: serde::Serializer,
262    {
263        match self {
264            Self::None => (&[] as &[serde_json::Value]).serialize(serializer),
265            Self::Logs(logs) => logs.serialize(serializer),
266            Self::Bool(full) => full.serialize(serializer),
267            Self::TransactionReceipts(params) => params.serialize(serializer),
268        }
269    }
270}
271
272#[cfg(feature = "serde")]
273impl<'a> serde::Deserialize<'a> for Params {
274    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
275    where
276        D: serde::Deserializer<'a>,
277    {
278        let v = serde_json::Value::deserialize(deserializer)?;
279        Self::from_json_value(v).map_err(serde::de::Error::custom)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use alloy_primitives::hex;
287    use similar_asserts::assert_eq;
288
289    #[test]
290    #[cfg(feature = "serde")]
291    fn params_serde() {
292        // Test deserialization of boolean parameter
293        let s: Params = serde_json::from_str("true").unwrap();
294        assert_eq!(s, Params::Bool(true));
295
296        // Test deserialization of null (None) parameter
297        let s: Params = serde_json::from_str("null").unwrap();
298        assert_eq!(s, Params::None);
299
300        // Test deserialization of log parameters
301        let filter = Filter::default();
302        let s: Params = serde_json::from_str(&serde_json::to_string(&filter).unwrap()).unwrap();
303        assert_eq!(s, Params::Logs(Box::new(filter)));
304
305        // Test deserialization of transaction receipts parameters
306        let json = r#"{"transactionHashes":["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}"#;
307        let param: Params = serde_json::from_str(json).unwrap();
308        match param {
309            Params::TransactionReceipts(params) => {
310                assert_eq!(
311                    params.transaction_hashes,
312                    Some(vec![B256::from(hex!(
313                        "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"
314                    ))])
315                );
316            }
317            _ => panic!("Expected TransactionReceipts variant"),
318        }
319
320        // Test deserialization of transaction receipts parameters, with null transactionHashes
321        let json = r#"{"transactionHashes":null}"#;
322        let param: Params = serde_json::from_str(json).unwrap();
323        match param {
324            Params::TransactionReceipts(params) => {
325                assert_eq!(params.transaction_hashes, None);
326            }
327            _ => panic!("Expected TransactionReceipts variant"),
328        }
329
330        // Test deserialization of transaction receipts parameters, with empty transactionHashes
331        let json = r#"{"transactionHashes":[]}"#;
332        let param: Params = serde_json::from_str(json).unwrap();
333        match param {
334            Params::TransactionReceipts(params) => {
335                assert_eq!(params.transaction_hashes, Some(vec![]));
336            }
337            _ => panic!("Expected TransactionReceipts variant"),
338        }
339    }
340
341    #[test]
342    fn params_is_bool() {
343        // Check if the `is_bool` method correctly identifies boolean parameters
344        let param = Params::Bool(true);
345        assert!(param.is_bool());
346
347        let param = Params::None;
348        assert!(!param.is_bool());
349
350        let param = Params::Logs(Box::default());
351        assert!(!param.is_bool());
352    }
353
354    #[test]
355    fn params_is_logs() {
356        // Check if the `is_logs` method correctly identifies log parameters
357        let param = Params::Logs(Box::default());
358        assert!(param.is_logs());
359
360        let param = Params::None;
361        assert!(!param.is_logs());
362
363        let param = Params::Bool(true);
364        assert!(!param.is_logs());
365    }
366
367    #[test]
368    fn params_from_filter() {
369        let filter = Filter::default();
370        let param: Params = filter.clone().into();
371        assert_eq!(param, Params::Logs(Box::new(filter)));
372    }
373
374    #[test]
375    fn params_from_bool() {
376        let param: Params = true.into();
377        assert_eq!(param, Params::Bool(true));
378
379        let param: Params = false.into();
380        assert_eq!(param, Params::Bool(false));
381    }
382
383    #[test]
384    fn params_from_transaction_receipts() {
385        let params = TransactionReceiptsParams { transaction_hashes: Some(vec![B256::random()]) };
386        let param: Params = params.clone().into();
387        assert_eq!(param, Params::TransactionReceipts(params));
388    }
389
390    #[test]
391    #[cfg(feature = "serde")]
392    fn subscription_kind_str_roundtrip() {
393        use core::str::FromStr;
394
395        for kind in [
396            SubscriptionKind::NewHeads,
397            SubscriptionKind::Logs,
398            SubscriptionKind::NewPendingTransactions,
399            SubscriptionKind::Syncing,
400            SubscriptionKind::TransactionReceipts,
401        ] {
402            let s = kind.to_string();
403            let parsed: SubscriptionKind = s.parse().unwrap();
404            assert_eq!(kind, parsed);
405
406            // Verify FromStr matches serde
407            let serde_str = serde_json::to_string(&kind).unwrap();
408            let serde_str = serde_str.trim_matches('"');
409            assert_eq!(s, serde_str);
410            assert_eq!(SubscriptionKind::from_str(serde_str).unwrap(), kind);
411        }
412    }
413
414    #[test]
415    #[cfg(feature = "serde")]
416    fn params_serialize_none() {
417        let param = Params::None;
418        let serialized = serde_json::to_string(&param).unwrap();
419        assert_eq!(serialized, "[]");
420    }
421
422    #[test]
423    #[cfg(feature = "serde")]
424    fn params_serialize_bool() {
425        let param = Params::Bool(true);
426        let serialized = serde_json::to_string(&param).unwrap();
427        assert_eq!(serialized, "true");
428
429        let param = Params::Bool(false);
430        let serialized = serde_json::to_string(&param).unwrap();
431        assert_eq!(serialized, "false");
432    }
433
434    #[test]
435    #[cfg(feature = "serde")]
436    fn params_serialize_logs() {
437        let filter = Filter::default();
438        let param = Params::Logs(Box::new(filter.clone()));
439        let serialized = serde_json::to_string(&param).unwrap();
440        let expected = serde_json::to_string(&filter).unwrap();
441        assert_eq!(serialized, expected);
442    }
443
444    #[test]
445    #[cfg(feature = "serde")]
446    fn params_serialize_transaction_receipts() {
447        let params = TransactionReceiptsParams {
448            transaction_hashes: Some(vec![B256::from(hex!(
449                "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"
450            ))]),
451        };
452        let param = Params::TransactionReceipts(params);
453        let serialized = serde_json::to_string(&param).unwrap();
454        let expected = r#"{"transactionHashes":["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}"#;
455        assert_eq!(serialized, expected);
456
457        // None must be serialized as `null` (not omitted) so that round-tripping
458        // through `Params::from_json_value` keeps the `TransactionReceipts` variant.
459        let param = Params::TransactionReceipts(TransactionReceiptsParams::default());
460        let serialized = serde_json::to_string(&param).unwrap();
461        assert_eq!(serialized, r#"{"transactionHashes":null}"#);
462        let roundtrip: Params = serde_json::from_str(&serialized).unwrap();
463        assert_eq!(roundtrip, param);
464    }
465
466    #[test]
467    #[cfg(feature = "serde")]
468    fn sync_status_metadata_serde() {
469        let metadata = SyncStatusMetadata {
470            syncing: true,
471            starting_block: 900,
472            current_block: 902,
473            highest_block: Some(1108),
474        };
475
476        let serialized = serde_json::to_string(&metadata).unwrap();
477        // Block numbers should be hex-encoded per EIP-1474
478        assert_eq!(
479            serialized,
480            r#"{"syncing":true,"startingBlock":"0x384","currentBlock":"0x386","highestBlock":"0x454"}"#
481        );
482
483        // Roundtrip
484        let deserialized: SyncStatusMetadata = serde_json::from_str(&serialized).unwrap();
485        assert_eq!(metadata, deserialized);
486
487        // Test without highest_block
488        let metadata_no_highest = SyncStatusMetadata {
489            syncing: false,
490            starting_block: 0,
491            current_block: 100,
492            highest_block: None,
493        };
494        let serialized = serde_json::to_string(&metadata_no_highest).unwrap();
495        assert_eq!(serialized, r#"{"syncing":false,"startingBlock":"0x0","currentBlock":"0x64"}"#);
496
497        let deserialized: SyncStatusMetadata = serde_json::from_str(&serialized).unwrap();
498        assert_eq!(metadata_no_highest, deserialized);
499    }
500}