Skip to main content

sol_parser_sdk/grpc/
subscribe_builder.rs

1//! Yellowstone [`SubscribeRequest`] 构造(DEX 订阅、钱包 mentions 转账监控等共用)。
2
3use std::collections::HashMap;
4
5use yellowstone_grpc_proto::prelude::{
6    CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts,
7    SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterTransactions,
8};
9
10use super::types::{AccountFilter, EventTypeFilter, TransactionFilter};
11
12#[inline]
13fn tx_filter_to_proto(f: &TransactionFilter) -> SubscribeRequestFilterTransactions {
14    SubscribeRequestFilterTransactions {
15        vote: Some(false),
16        failed: Some(false),
17        signature: None,
18        account_include: f.account_include.clone(),
19        account_exclude: f.account_exclude.clone(),
20        account_required: f.account_required.clone(),
21    }
22}
23
24#[inline]
25fn acc_filter_to_proto(f: &AccountFilter) -> SubscribeRequestFilterAccounts {
26    SubscribeRequestFilterAccounts {
27        account: f.account.clone(),
28        owner: f.owner.clone(),
29        filters: f.filters.clone(),
30        nonempty_txn_signature: None,
31        cuckoo_accounts_filter: None,
32    }
33}
34
35fn finalize(
36    transactions: HashMap<String, SubscribeRequestFilterTransactions>,
37    accounts: HashMap<String, SubscribeRequestFilterAccounts>,
38    blocks_meta: HashMap<String, SubscribeRequestFilterBlocksMeta>,
39    commitment: CommitmentLevel,
40) -> SubscribeRequest {
41    SubscribeRequest {
42        slots: HashMap::new(),
43        accounts,
44        transactions,
45        transactions_status: HashMap::new(),
46        blocks: HashMap::new(),
47        blocks_meta,
48        entry: HashMap::new(),
49        commitment: Some(commitment as i32),
50        accounts_data_slice: Vec::new(),
51        ping: None,
52        from_slot: None,
53    }
54}
55
56/// 构建订阅请求:`tx_0`…`tx_n`、`acc_0`…、**commitment = Processed**(与历史行为一致)。
57pub fn build_subscribe_request(
58    tx_filters: &[TransactionFilter],
59    acc_filters: &[AccountFilter],
60) -> SubscribeRequest {
61    build_subscribe_request_with_commitment(tx_filters, acc_filters, CommitmentLevel::Processed)
62}
63
64/// 与 [`build_subscribe_request`] 相同,可指定 commitment(例如 Confirmed)。
65pub fn build_subscribe_request_with_commitment(
66    tx_filters: &[TransactionFilter],
67    acc_filters: &[AccountFilter],
68    commitment: CommitmentLevel,
69) -> SubscribeRequest {
70    build_subscribe_request_with_event_filter(tx_filters, acc_filters, None, commitment)
71}
72
73/// 与 [`build_subscribe_request_with_commitment`] 相同,但会按事件过滤器订阅区块元数据。
74pub fn build_subscribe_request_with_event_filter(
75    tx_filters: &[TransactionFilter],
76    acc_filters: &[AccountFilter],
77    event_type_filter: Option<&EventTypeFilter>,
78    commitment: CommitmentLevel,
79) -> SubscribeRequest {
80    let transactions = tx_filters
81        .iter()
82        .enumerate()
83        .map(|(i, f)| (format!("tx_{}", i), tx_filter_to_proto(f)))
84        .collect();
85    let accounts = acc_filters
86        .iter()
87        .enumerate()
88        .map(|(i, f)| (format!("acc_{}", i), acc_filter_to_proto(f)))
89        .collect();
90    let blocks_meta = if event_type_filter.map(|f| f.includes_block_meta()).unwrap_or(false) {
91        HashMap::from([("block_meta".to_string(), SubscribeRequestFilterBlocksMeta {})])
92    } else {
93        HashMap::new()
94    };
95    finalize(transactions, accounts, blocks_meta, commitment)
96}
97
98/// 自定义交易订阅在 `SubscribeRequest.transactions` 中的 key(便于日志区分多条订阅)。
99pub fn build_subscribe_transaction_filters_named<N: AsRef<str>>(
100    named_tx_filters: &[(N, TransactionFilter)],
101    acc_filters: &[AccountFilter],
102    commitment: CommitmentLevel,
103) -> SubscribeRequest {
104    let transactions = named_tx_filters
105        .iter()
106        .map(|(name, f)| (name.as_ref().to_string(), tx_filter_to_proto(f)))
107        .collect();
108    let accounts = acc_filters
109        .iter()
110        .enumerate()
111        .map(|(i, f)| (format!("acc_{}", i), acc_filter_to_proto(f)))
112        .collect();
113    finalize(transactions, accounts, HashMap::new(), commitment)
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::grpc::types::EventType;
120
121    #[test]
122    fn event_filter_controls_block_meta_subscription() {
123        let without_block_meta = build_subscribe_request_with_event_filter(
124            &[],
125            &[],
126            Some(&EventTypeFilter::include_only(vec![EventType::PumpFunTrade])),
127            CommitmentLevel::Processed,
128        );
129        assert!(without_block_meta.blocks_meta.is_empty());
130
131        let with_block_meta = build_subscribe_request_with_event_filter(
132            &[],
133            &[],
134            Some(&EventTypeFilter::include_only(vec![EventType::BlockMeta])),
135            CommitmentLevel::Processed,
136        );
137        assert_eq!(with_block_meta.blocks_meta.len(), 1);
138    }
139}