Skip to main content

richat_filter/
filter.rs

1use {
2    crate::{
3        config::{
4            ConfigFilter, ConfigFilterAccounts, ConfigFilterAccountsDataSlice,
5            ConfigFilterAccountsFilter, ConfigFilterAccountsFilterLamports, ConfigFilterBlocks,
6            ConfigFilterCommitment, ConfigFilterSlots, ConfigFilterTransactions, MAX_DATA_SIZE,
7            MAX_FILTERS,
8        },
9        message::{
10            Message, MessageAccount, MessageBlock, MessageBlockCreatedAt, MessageBlockMeta,
11            MessageEntry, MessageRef, MessageSlot, MessageTransaction,
12        },
13        protobuf::encode::{
14            SubscribeUpdateMessageLimited, SubscribeUpdateMessageProst, UpdateOneofLimitedEncode,
15            UpdateOneofLimitedEncodeAccount, UpdateOneofLimitedEncodeAccountInner,
16            UpdateOneofLimitedEncodeBlock, UpdateOneofLimitedEncodeTransactionStatus,
17        },
18    },
19    arrayvec::ArrayVec,
20    prost::{Message as _, bytes::BufMut, encoding::decode_varint},
21    richat_proto::geyser::{
22        SlotStatus, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock,
23        SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionStatus,
24        subscribe_update::UpdateOneof,
25    },
26    smallvec::{SmallVec, smallvec_inline},
27    solana_account::ReadableAccount,
28    solana_commitment_config::CommitmentLevel,
29    solana_pubkey::Pubkey,
30    solana_signature::Signature,
31    spl_token_2022_interface::{
32        generic_token_account::GenericTokenAccount, state::Account as TokenAccount,
33    },
34    std::{
35        borrow::{Borrow, Cow},
36        collections::{HashMap, HashSet},
37        ops::{Not, Range},
38        sync::Arc,
39    },
40};
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43struct FilterName(Arc<String>);
44
45impl AsRef<str> for FilterName {
46    #[inline]
47    fn as_ref(&self) -> &str {
48        &self.0
49    }
50}
51
52impl Borrow<str> for FilterName {
53    #[inline]
54    fn borrow(&self) -> &str {
55        &self.0[..]
56    }
57}
58
59#[derive(Debug, Default)]
60struct FilterNames {
61    names: HashSet<FilterName>,
62}
63
64impl FilterNames {
65    fn get(&mut self, name: &str) -> FilterName {
66        match self.names.get(name) {
67            Some(name) => name.clone(),
68            None => {
69                let name = FilterName(Arc::new(name.into()));
70                self.names.insert(name.clone());
71                name
72            }
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78pub struct Filter {
79    slots: FilterSlots,
80    accounts: FilterAccounts,
81    accounts_data_slices: FilterAccountDataSlices,
82    transactions: FilterTransactions,
83    transactions_status: FilterTransactions,
84    entries: FilterEntries,
85    blocks_meta: FilterBlocksMeta,
86    blocks: FilterBlocks,
87    commitment: ConfigFilterCommitment,
88}
89
90impl Default for Filter {
91    fn default() -> Self {
92        Self {
93            slots: FilterSlots::default(),
94            accounts: FilterAccounts::default(),
95            accounts_data_slices: FilterAccountDataSlices::default(),
96            transactions: FilterTransactions {
97                filter_type: FilterTransactionsType::Transaction,
98                filters: HashMap::default(),
99            },
100            transactions_status: FilterTransactions {
101                filter_type: FilterTransactionsType::TransactionStatus,
102                filters: HashMap::default(),
103            },
104            entries: FilterEntries::default(),
105            blocks_meta: FilterBlocksMeta::default(),
106            blocks: FilterBlocks::default(),
107            commitment: ConfigFilterCommitment::default(),
108        }
109    }
110}
111
112impl Filter {
113    pub fn new(config: &ConfigFilter) -> Self {
114        let mut names = FilterNames::default();
115        Self {
116            slots: FilterSlots::new(&mut names, &config.slots),
117            accounts: FilterAccounts::new(&mut names, &config.accounts),
118            accounts_data_slices: FilterAccountDataSlices::new(&config.accounts_data_slice),
119            transactions: FilterTransactions::new(
120                &mut names,
121                &config.transactions,
122                FilterTransactionsType::Transaction,
123            ),
124            transactions_status: FilterTransactions::new(
125                &mut names,
126                &config.transactions_status,
127                FilterTransactionsType::TransactionStatus,
128            ),
129            entries: FilterEntries::new(&mut names, &config.entries),
130            blocks_meta: FilterBlocksMeta::new(&mut names, &config.blocks_meta),
131            blocks: FilterBlocks::new(&mut names, &config.blocks),
132            commitment: config
133                .commitment
134                .unwrap_or(ConfigFilterCommitment::Processed),
135        }
136    }
137
138    pub fn contains_blocks(&self) -> bool {
139        !self.blocks.filters.is_empty()
140    }
141
142    pub const fn commitment(&self) -> ConfigFilterCommitment {
143        self.commitment
144    }
145
146    pub fn get_updates<'a>(
147        &'a self,
148        message: &'a Message,
149        commitment: CommitmentLevel,
150    ) -> SmallVec<[FilteredUpdate<'a>; 2]> {
151        self.get_updates_ref(message.into(), commitment)
152    }
153
154    pub fn get_updates_ref<'a>(
155        &'a self,
156        message: MessageRef<'a>,
157        commitment: CommitmentLevel,
158    ) -> SmallVec<[FilteredUpdate<'a>; 2]> {
159        let mut vec = SmallVec::<[FilteredUpdate; 2]>::new();
160        match message {
161            MessageRef::Slot(message) => {
162                if let Some(update) = self.slots.get_update(message, commitment) {
163                    vec.push(update);
164                }
165            }
166            MessageRef::Account(message) => {
167                if let Some(update) = self
168                    .accounts
169                    .get_update(message, &self.accounts_data_slices)
170                {
171                    vec.push(update);
172                }
173            }
174            MessageRef::Transaction(message) => {
175                if let Some(update) = self.transactions.get_update(message) {
176                    vec.push(update);
177                }
178                if let Some(update) = self.transactions_status.get_update(message) {
179                    vec.push(update);
180                }
181            }
182            MessageRef::Entry(message) => {
183                if let Some(update) = self.entries.get_update(message) {
184                    vec.push(update);
185                }
186            }
187            MessageRef::BlockMeta(message) => {
188                if let Some(update) = self.blocks_meta.get_update(message) {
189                    vec.push(update);
190                }
191            }
192            MessageRef::Block(message) => {
193                for update in self.blocks.get_updates(message, &self.accounts_data_slices) {
194                    vec.push(update);
195                }
196            }
197        }
198        vec
199    }
200}
201
202#[derive(Debug, Default, Clone, Copy)]
203struct FilterSlotsInner {
204    filter_by_commitment: bool,
205    interslot_updates: bool,
206}
207
208impl FilterSlotsInner {
209    fn new(filter: ConfigFilterSlots) -> Self {
210        Self {
211            filter_by_commitment: filter.filter_by_commitment.unwrap_or_default(),
212            interslot_updates: filter.interslot_updates.unwrap_or_default(),
213        }
214    }
215}
216
217#[derive(Debug, Default, Clone)]
218struct FilterSlots {
219    filters: HashMap<FilterName, FilterSlotsInner>,
220}
221
222impl FilterSlots {
223    fn new(names: &mut FilterNames, configs: &HashMap<String, ConfigFilterSlots>) -> Self {
224        Self {
225            filters: configs
226                .iter()
227                .map(|(name, filter)| (names.get(name), FilterSlotsInner::new(*filter)))
228                .collect(),
229        }
230    }
231
232    fn get_update<'a>(
233        &'a self,
234        message: &'a MessageSlot,
235        commitment: CommitmentLevel,
236    ) -> Option<FilteredUpdate<'a>> {
237        let msg_status = message.status();
238
239        let filters = self
240            .filters
241            .iter()
242            .filter_map(|(name, inner)| {
243                if (!inner.filter_by_commitment
244                    || ((msg_status == SlotStatus::SlotProcessed
245                        && commitment == CommitmentLevel::Processed)
246                        || (msg_status == SlotStatus::SlotConfirmed
247                            && commitment == CommitmentLevel::Confirmed)
248                        || (msg_status == SlotStatus::SlotFinalized
249                            && commitment == CommitmentLevel::Finalized)))
250                    && (inner.interslot_updates
251                        || matches!(
252                            msg_status,
253                            SlotStatus::SlotProcessed
254                                | SlotStatus::SlotConfirmed
255                                | SlotStatus::SlotFinalized
256                        ))
257                {
258                    Some(name.as_ref())
259                } else {
260                    None
261                }
262            })
263            .collect::<FilteredUpdateFilters>();
264
265        filters.is_empty().not().then(|| FilteredUpdate {
266            filters,
267            filtered_update: FilteredUpdateType::Slot { message },
268        })
269    }
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273enum FilterAccountsLamports {
274    Eq(u64),
275    Ne(u64),
276    Lt(u64),
277    Gt(u64),
278}
279
280impl From<ConfigFilterAccountsFilterLamports> for FilterAccountsLamports {
281    fn from(cmp: ConfigFilterAccountsFilterLamports) -> Self {
282        match cmp {
283            ConfigFilterAccountsFilterLamports::Eq(value) => Self::Eq(value),
284            ConfigFilterAccountsFilterLamports::Ne(value) => Self::Ne(value),
285            ConfigFilterAccountsFilterLamports::Lt(value) => Self::Lt(value),
286            ConfigFilterAccountsFilterLamports::Gt(value) => Self::Gt(value),
287        }
288    }
289}
290
291impl FilterAccountsLamports {
292    const fn is_match(self, lamports: u64) -> bool {
293        match self {
294            Self::Eq(value) => value == lamports,
295            Self::Ne(value) => value != lamports,
296            Self::Lt(value) => value > lamports,
297            Self::Gt(value) => value < lamports,
298        }
299    }
300}
301
302#[derive(Debug, Default, Clone)]
303struct FilterAccountsState {
304    memcmp: ArrayVec<(usize, ArrayVec<u8, MAX_DATA_SIZE>), MAX_FILTERS>,
305    datasize: Option<usize>,
306    token_account_state: bool,
307    lamports: ArrayVec<FilterAccountsLamports, MAX_FILTERS>,
308}
309
310impl FilterAccountsState {
311    fn new(filters: &[ConfigFilterAccountsFilter]) -> Self {
312        let mut me = Self::default();
313        for filter in filters {
314            match filter {
315                ConfigFilterAccountsFilter::Memcmp { offset, data } => {
316                    me.memcmp.push((*offset, data.iter().cloned().collect()));
317                }
318                ConfigFilterAccountsFilter::DataSize(datasize) => {
319                    me.datasize = Some(*datasize as usize);
320                }
321                ConfigFilterAccountsFilter::TokenAccountState => {
322                    me.token_account_state = true;
323                }
324                ConfigFilterAccountsFilter::Lamports(value) => {
325                    me.lamports.push((*value).into());
326                }
327            }
328        }
329        me
330    }
331
332    fn is_match(&self, lamports: u64, data: &[u8]) -> bool {
333        if matches!(self.datasize, Some(datasize) if data.len() != datasize) {
334            return false;
335        }
336        if self.token_account_state && !TokenAccount::valid_account_data(data) {
337            return false;
338        }
339        if self.lamports.iter().any(|f| !f.is_match(lamports)) {
340            return false;
341        }
342        for (offset, bytes) in self.memcmp.iter() {
343            if data.len() < *offset + bytes.len() {
344                return false;
345            }
346            let data = &data[*offset..*offset + bytes.len()];
347            if data != bytes.as_slice() {
348                return false;
349            }
350        }
351        true
352    }
353}
354
355#[derive(Debug, Clone)]
356struct FilterAccountsInner {
357    account: HashSet<Pubkey>,
358    owner: HashSet<Pubkey>,
359    filters: Option<FilterAccountsState>,
360    nonempty_txn_signature: Option<bool>,
361}
362
363#[derive(Debug, Default, Clone)]
364struct FilterAccounts {
365    filters: HashMap<FilterName, FilterAccountsInner>,
366}
367
368impl FilterAccounts {
369    fn new(names: &mut FilterNames, configs: &HashMap<String, ConfigFilterAccounts>) -> Self {
370        let mut me = Self::default();
371        for (name, filter) in configs {
372            me.filters.insert(
373                names.get(name),
374                FilterAccountsInner {
375                    account: filter.account.iter().copied().collect(),
376                    owner: filter.owner.iter().copied().collect(),
377                    filters: if filter.filters.is_empty() {
378                        None
379                    } else {
380                        Some(FilterAccountsState::new(&filter.filters))
381                    },
382                    nonempty_txn_signature: filter.nonempty_txn_signature,
383                },
384            );
385        }
386        me
387    }
388
389    fn get_update<'a>(
390        &'a self,
391        message: &'a MessageAccount,
392        data_slices: &'a FilterAccountDataSlices,
393    ) -> Option<FilteredUpdate<'a>> {
394        let msg_pubkey = message.pubkey();
395        let msg_owner = message.owner();
396        let msg_lamports = message.lamports();
397        let msg_data = message.data();
398        let msg_nonempty_txn_signature = message.nonempty_txn_signature();
399
400        let filters = self
401            .filters
402            .iter()
403            .filter_map(|(name, filter)| {
404                if !filter.account.is_empty() && !filter.account.contains(msg_pubkey) {
405                    return None;
406                }
407
408                if !filter.owner.is_empty() && !filter.owner.contains(msg_owner) {
409                    return None;
410                }
411
412                if let Some(filters) = &filter.filters {
413                    if !filters.is_match(msg_lamports, msg_data) {
414                        return None;
415                    }
416                }
417
418                if let Some(nonempty_txn_signature) = filter.nonempty_txn_signature {
419                    if nonempty_txn_signature != msg_nonempty_txn_signature {
420                        return None;
421                    }
422                }
423
424                Some(name.as_ref())
425            })
426            .collect::<FilteredUpdateFilters>();
427
428        filters.is_empty().not().then(|| FilteredUpdate {
429            filters,
430            filtered_update: FilteredUpdateType::Account {
431                message,
432                data_slices,
433            },
434        })
435    }
436}
437
438#[derive(Debug, Default, Clone)]
439pub struct FilterAccountDataSlices(SmallVec<[Range<usize>; 4]>);
440
441impl FilterAccountDataSlices {
442    fn empty() -> &'static FilterAccountDataSlices {
443        static EMPTY: FilterAccountDataSlices = FilterAccountDataSlices(SmallVec::new_const());
444        &EMPTY
445    }
446
447    fn new(data_slices: &[ConfigFilterAccountsDataSlice]) -> Self {
448        let mut vec = SmallVec::new();
449        for data_slice in data_slices {
450            vec.push(Range {
451                start: data_slice.offset as usize,
452                end: (data_slice.offset + data_slice.length) as usize,
453            })
454        }
455        Self(vec)
456    }
457
458    pub fn is_empty(&self) -> bool {
459        self.0.is_empty()
460    }
461
462    pub fn get_slice<'a>(&self, source: &'a [u8]) -> Cow<'a, [u8]> {
463        if self.0.is_empty() {
464            Cow::Borrowed(source)
465        } else if self.0.len() == 1 {
466            if source.len() >= self.0[0].end {
467                Cow::Borrowed(&source[self.0[0].start..self.0[0].end])
468            } else {
469                Cow::Borrowed(&[])
470            }
471        } else {
472            let mut data = Vec::with_capacity(self.0.iter().map(|ds| ds.end - ds.start).sum());
473            for data_slice in self.0.iter() {
474                if source.len() >= data_slice.end {
475                    data.extend_from_slice(&source[data_slice.start..data_slice.end]);
476                }
477            }
478            Cow::Owned(data)
479        }
480    }
481}
482
483#[derive(Debug, Clone, Copy, PartialEq, Eq)]
484enum FilterTransactionsType {
485    Transaction,
486    TransactionStatus,
487}
488
489#[derive(Debug, Clone)]
490struct FilterTransactionsInner {
491    vote: Option<bool>,
492    failed: Option<bool>,
493    signature: Option<Signature>,
494    account_include: HashSet<Pubkey>,
495    account_exclude: HashSet<Pubkey>,
496    account_required: HashSet<Pubkey>,
497}
498
499#[derive(Debug, Clone)]
500struct FilterTransactions {
501    filter_type: FilterTransactionsType,
502    filters: HashMap<FilterName, FilterTransactionsInner>,
503}
504
505impl FilterTransactions {
506    fn new(
507        names: &mut FilterNames,
508        configs: &HashMap<String, ConfigFilterTransactions>,
509        filter_type: FilterTransactionsType,
510    ) -> Self {
511        let mut filters = HashMap::new();
512        for (name, filter) in configs {
513            filters.insert(
514                names.get(name),
515                FilterTransactionsInner {
516                    vote: filter.vote,
517                    failed: filter.failed,
518                    signature: filter.signature,
519                    account_include: filter.account_include.iter().copied().collect(),
520                    account_exclude: filter.account_exclude.iter().copied().collect(),
521                    account_required: filter.account_required.iter().copied().collect(),
522                },
523            );
524        }
525        Self {
526            filter_type,
527            filters,
528        }
529    }
530
531    fn get_update<'a>(&'a self, message: &'a MessageTransaction) -> Option<FilteredUpdate<'a>> {
532        let msg_vote = message.vote();
533        let msg_failed = message.failed();
534        let msg_signature = message.signature_ref();
535        let msg_account_keys = message.account_keys();
536
537        let filters = self
538            .filters
539            .iter()
540            .filter_map(|(name, filter)| {
541                if let Some(is_vote) = filter.vote {
542                    if is_vote != msg_vote {
543                        return None;
544                    }
545                }
546
547                if let Some(is_failed) = filter.failed {
548                    if is_failed != msg_failed {
549                        return None;
550                    }
551                }
552
553                if let Some(signature) = &filter.signature {
554                    if signature.as_ref() != msg_signature {
555                        return None;
556                    }
557                }
558
559                if !filter.account_include.is_empty()
560                    && filter
561                        .account_include
562                        .intersection(msg_account_keys)
563                        .next()
564                        .is_none()
565                {
566                    return None;
567                }
568
569                if !filter.account_exclude.is_empty()
570                    && filter
571                        .account_exclude
572                        .intersection(msg_account_keys)
573                        .next()
574                        .is_some()
575                {
576                    return None;
577                }
578
579                if !filter.account_required.is_empty()
580                    && !filter.account_required.is_subset(msg_account_keys)
581                {
582                    return None;
583                }
584
585                Some(name.as_ref())
586            })
587            .collect::<FilteredUpdateFilters>();
588
589        filters.is_empty().not().then(|| FilteredUpdate {
590            filters,
591            filtered_update: match self.filter_type {
592                FilterTransactionsType::Transaction => FilteredUpdateType::Transaction { message },
593                FilterTransactionsType::TransactionStatus => {
594                    FilteredUpdateType::TransactionStatus { message }
595                }
596            },
597        })
598    }
599}
600
601#[derive(Debug, Default, Clone)]
602struct FilterEntries {
603    filters: Vec<FilterName>,
604}
605
606impl FilterEntries {
607    fn new(names: &mut FilterNames, configs: &HashSet<String>) -> Self {
608        Self {
609            filters: configs.iter().map(|name| names.get(name)).collect(),
610        }
611    }
612
613    fn get_update<'a>(&'a self, message: &'a MessageEntry) -> Option<FilteredUpdate<'a>> {
614        let filters = self
615            .filters
616            .iter()
617            .map(|f| f.as_ref())
618            .collect::<FilteredUpdateFilters>();
619
620        filters.is_empty().not().then(|| FilteredUpdate {
621            filters,
622            filtered_update: FilteredUpdateType::Entry { message },
623        })
624    }
625}
626
627#[derive(Debug, Default, Clone)]
628struct FilterBlocksMeta {
629    filters: Vec<FilterName>,
630}
631
632impl FilterBlocksMeta {
633    fn new(names: &mut FilterNames, configs: &HashSet<String>) -> Self {
634        Self {
635            filters: configs.iter().map(|name| names.get(name)).collect(),
636        }
637    }
638
639    fn get_update<'a>(&'a self, message: &'a MessageBlockMeta) -> Option<FilteredUpdate<'a>> {
640        let filters = self
641            .filters
642            .iter()
643            .map(|f| f.as_ref())
644            .collect::<FilteredUpdateFilters>();
645
646        filters.is_empty().not().then(|| FilteredUpdate {
647            filters,
648            filtered_update: FilteredUpdateType::BlockMeta { message },
649        })
650    }
651}
652
653#[derive(Debug, Clone)]
654struct FilterBlocksInner {
655    account_include: HashSet<Pubkey>,
656    include_transactions: Option<bool>,
657    include_accounts: Option<bool>,
658    include_entries: Option<bool>,
659}
660
661#[derive(Debug, Default, Clone)]
662struct FilterBlocks {
663    filters: HashMap<FilterName, FilterBlocksInner>,
664}
665
666impl FilterBlocks {
667    fn new(names: &mut FilterNames, configs: &HashMap<String, ConfigFilterBlocks>) -> Self {
668        let mut me = Self::default();
669        for (name, filter) in configs {
670            me.filters.insert(
671                names.get(name),
672                FilterBlocksInner {
673                    account_include: filter.account_include.iter().copied().collect(),
674                    include_transactions: filter.include_transactions,
675                    include_accounts: filter.include_accounts,
676                    include_entries: filter.include_entries,
677                },
678            );
679        }
680        me
681    }
682
683    fn get_updates<'a>(
684        &'a self,
685        message: &'a MessageBlock,
686        data_slices: &'a FilterAccountDataSlices,
687    ) -> impl Iterator<Item = FilteredUpdate<'a>> {
688        self.filters.iter().map(|(name, filter)| {
689            let accounts = if filter.include_accounts == Some(true) {
690                message
691                    .accounts
692                    .iter()
693                    .enumerate()
694                    .filter_map(|(idx, account)| {
695                        if !filter.account_include.is_empty()
696                            && !filter.account_include.contains(account.pubkey())
697                        {
698                            None
699                        } else {
700                            Some(idx)
701                        }
702                    })
703                    .collect::<Vec<_>>()
704            } else {
705                vec![]
706            };
707
708            let transactions = if matches!(filter.include_transactions, None | Some(true)) {
709                message
710                    .transactions
711                    .iter()
712                    .enumerate()
713                    .filter_map(|(idx, tx)| {
714                        if !filter.account_include.is_empty()
715                            && filter
716                                .account_include
717                                .intersection(tx.account_keys())
718                                .next()
719                                .is_none()
720                        {
721                            None
722                        } else {
723                            Some(idx)
724                        }
725                    })
726                    .collect::<Vec<_>>()
727            } else {
728                vec![]
729            };
730
731            FilteredUpdate {
732                filters: smallvec_inline![name.as_ref(); 8],
733                filtered_update: FilteredUpdateType::Block {
734                    message,
735                    accounts,
736                    transactions,
737                    entries: filter.include_entries == Some(true),
738                    data_slices,
739                },
740            }
741        })
742    }
743}
744
745#[derive(Debug, Clone)]
746pub struct FilteredUpdate<'a> {
747    pub filters: FilteredUpdateFilters<'a>,
748    pub filtered_update: FilteredUpdateType<'a>,
749}
750
751impl FilteredUpdate<'_> {
752    pub fn encode_to_vec(&self) -> Vec<u8> {
753        let mut bytes = vec![];
754        self.encode(&mut bytes);
755        bytes
756    }
757
758    pub fn encode(&self, buf: &mut impl BufMut) {
759        match &self.filtered_update {
760            FilteredUpdateType::Slot { message } => match message {
761                MessageSlot::Limited {
762                    created_at,
763                    buffer,
764                    range,
765                    ..
766                } => SubscribeUpdateMessageLimited {
767                    filters: &self.filters,
768                    update: UpdateOneofLimitedEncode::Slot(
769                        &buffer.as_slice()[range.start..range.end],
770                    ),
771                    created_at: *created_at,
772                }
773                .encode(buf),
774                MessageSlot::Prost {
775                    slot,
776                    parent,
777                    status,
778                    dead_error,
779                    created_at,
780                    ..
781                } => SubscribeUpdateMessageProst {
782                    filters: &self.filters,
783                    update: UpdateOneof::Slot(SubscribeUpdateSlot {
784                        slot: *slot,
785                        parent: *parent,
786                        status: *status as i32,
787                        dead_error: dead_error.clone(),
788                    }),
789                    created_at: *created_at,
790                }
791                .encode(buf),
792            },
793            FilteredUpdateType::Account {
794                message,
795                data_slices,
796            } => match message {
797                MessageAccount::Limited {
798                    pubkey,
799                    owner,
800                    lamports,
801                    executable,
802                    rent_epoch,
803                    data,
804                    txn_signature_offset,
805                    write_version,
806                    slot,
807                    is_startup,
808                    created_at,
809                    buffer,
810                    account_offset: _,
811                    range,
812                } => SubscribeUpdateMessageLimited {
813                    filters: &self.filters,
814                    update: UpdateOneofLimitedEncode::Account(if data_slices.is_empty() {
815                        UpdateOneofLimitedEncodeAccount::Slice(
816                            &buffer.as_slice()[range.start..range.end],
817                        )
818                    } else {
819                        UpdateOneofLimitedEncodeAccount::Fields {
820                            account: UpdateOneofLimitedEncodeAccountInner {
821                                pubkey,
822                                lamports: *lamports,
823                                owner,
824                                executable: *executable,
825                                rent_epoch: *rent_epoch,
826                                data: data_slices
827                                    .get_slice(&buffer.as_slice()[data.start..data.end]),
828                                write_version: {
829                                    let mut buffer = &buffer.as_slice()[*write_version..];
830                                    decode_varint(&mut buffer).expect("already verified")
831                                },
832                                txn_signature: txn_signature_offset
833                                    .map(|offset| &buffer.as_slice()[offset..offset + 64]),
834                            },
835                            slot: *slot,
836                            is_startup: *is_startup,
837                        }
838                    }),
839                    created_at: *created_at,
840                }
841                .encode(buf),
842                MessageAccount::Prost {
843                    account,
844                    slot,
845                    is_startup,
846                    created_at,
847                    ..
848                } => SubscribeUpdateMessageProst {
849                    filters: &self.filters,
850                    update: UpdateOneof::Account(SubscribeUpdateAccount {
851                        account: Some(SubscribeUpdateAccountInfo {
852                            pubkey: account.pubkey.clone(),
853                            lamports: account.lamports,
854                            owner: account.owner.clone(),
855                            executable: account.executable,
856                            rent_epoch: account.rent_epoch,
857                            data: data_slices.get_slice(&account.data).into_owned(),
858                            write_version: account.write_version,
859                            txn_signature: account.txn_signature.clone(),
860                        }),
861                        slot: *slot,
862                        is_startup: *is_startup,
863                    }),
864                    created_at: *created_at,
865                }
866                .encode(buf),
867            },
868            FilteredUpdateType::Transaction { message } => match message {
869                MessageTransaction::Limited {
870                    created_at,
871                    buffer,
872                    range,
873                    ..
874                } => SubscribeUpdateMessageLimited {
875                    filters: &self.filters,
876                    update: UpdateOneofLimitedEncode::Transaction(
877                        &buffer.as_slice()[range.start..range.end],
878                    ),
879                    created_at: *created_at,
880                }
881                .encode(buf),
882                MessageTransaction::Prost {
883                    transaction,
884                    slot,
885                    created_at,
886                    ..
887                } => SubscribeUpdateMessageProst {
888                    filters: &self.filters,
889                    update: UpdateOneof::Transaction(SubscribeUpdateTransaction {
890                        transaction: Some(transaction.clone()),
891                        slot: *slot,
892                    }),
893                    created_at: *created_at,
894                }
895                .encode(buf),
896            },
897            FilteredUpdateType::TransactionStatus { message } => match message {
898                MessageTransaction::Limited {
899                    error,
900                    is_vote,
901                    index,
902                    slot,
903                    created_at,
904                    ..
905                } => SubscribeUpdateMessageLimited {
906                    filters: &self.filters,
907                    update: UpdateOneofLimitedEncode::TransactionStatus(
908                        UpdateOneofLimitedEncodeTransactionStatus {
909                            slot: *slot,
910                            signature: message.signature_ref(),
911                            is_vote: *is_vote,
912                            index: *index,
913                            err: error.clone(),
914                        },
915                    ),
916                    created_at: *created_at,
917                }
918                .encode(buf),
919                MessageTransaction::Prost {
920                    error,
921                    transaction,
922                    slot,
923                    created_at,
924                    ..
925                } => SubscribeUpdateMessageProst {
926                    filters: &self.filters,
927                    update: UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus {
928                        slot: *slot,
929                        signature: message.signature_ref().to_vec(),
930                        is_vote: transaction.is_vote,
931                        index: transaction.index,
932                        err: error.clone(),
933                    }),
934                    created_at: *created_at,
935                }
936                .encode(buf),
937            },
938            FilteredUpdateType::Entry { message } => match message {
939                MessageEntry::Limited {
940                    created_at,
941                    buffer,
942                    range,
943                    ..
944                } => SubscribeUpdateMessageLimited {
945                    filters: &self.filters,
946                    update: UpdateOneofLimitedEncode::Entry(
947                        &buffer.as_slice()[range.start..range.end],
948                    ),
949                    created_at: *created_at,
950                }
951                .encode(buf),
952                MessageEntry::Prost {
953                    entry, created_at, ..
954                } => SubscribeUpdateMessageProst {
955                    filters: &self.filters,
956                    update: UpdateOneof::Entry(entry.clone()),
957                    created_at: *created_at,
958                }
959                .encode(buf),
960            },
961            FilteredUpdateType::BlockMeta { message } => match message {
962                MessageBlockMeta::Limited {
963                    created_at,
964                    buffer,
965                    range,
966                    ..
967                } => SubscribeUpdateMessageLimited {
968                    filters: &self.filters,
969                    update: UpdateOneofLimitedEncode::BlockMeta(
970                        &buffer.as_slice()[range.start..range.end],
971                    ),
972                    created_at: *created_at,
973                }
974                .encode(buf),
975                MessageBlockMeta::Prost {
976                    block_meta,
977                    created_at,
978                    ..
979                } => SubscribeUpdateMessageProst {
980                    filters: &self.filters,
981                    update: UpdateOneof::BlockMeta(block_meta.clone()),
982                    created_at: *created_at,
983                }
984                .encode(buf),
985            },
986            FilteredUpdateType::Block {
987                message,
988                accounts,
989                transactions,
990                entries,
991                data_slices,
992            } => match message.created_at {
993                MessageBlockCreatedAt::Limited(created_at) => {
994                    let block_meta = match message.block_meta.as_ref() {
995                        MessageBlockMeta::Limited { block_meta, .. } => block_meta,
996                        MessageBlockMeta::Prost { .. } => unreachable!(),
997                    };
998
999                    SubscribeUpdateMessageLimited {
1000                        filters: &self.filters,
1001                        update: UpdateOneofLimitedEncode::Block(UpdateOneofLimitedEncodeBlock {
1002                            slot: block_meta.slot,
1003                            blockhash: block_meta.blockhash.as_str(),
1004                            rewards: block_meta.rewards.clone(),
1005                            block_time: block_meta.block_time,
1006                            block_height: block_meta.block_height,
1007                            parent_slot: block_meta.parent_slot,
1008                            parent_blockhash: block_meta.parent_blockhash.as_str(),
1009                            executed_transaction_count: block_meta.executed_transaction_count,
1010                            transactions: transactions
1011                                .iter()
1012                                .map(|idx| match message.transactions[*idx].as_ref() {
1013                                    MessageTransaction::Limited {
1014                                        transaction_range,
1015                                        buffer,
1016                                        ..
1017                                    } => &buffer.as_slice()
1018                                        [transaction_range.start..transaction_range.end],
1019                                    MessageTransaction::Prost { .. } => unreachable!(),
1020                                })
1021                                .collect(),
1022                            updated_account_count: message.accounts.len() as u64,
1023                            accounts: accounts
1024                                .iter()
1025                                .map(|idx| match message.accounts[*idx].as_ref() {
1026                                    MessageAccount::Limited {
1027                                        pubkey,
1028                                        owner,
1029                                        lamports,
1030                                        executable,
1031                                        rent_epoch,
1032                                        data,
1033                                        txn_signature_offset,
1034                                        write_version,
1035                                        buffer,
1036                                        ..
1037                                    } => UpdateOneofLimitedEncodeAccountInner {
1038                                        pubkey,
1039                                        lamports: *lamports,
1040                                        owner,
1041                                        executable: *executable,
1042                                        rent_epoch: *rent_epoch,
1043                                        data: data_slices
1044                                            .get_slice(&buffer.as_slice()[data.start..data.end]),
1045                                        write_version: {
1046                                            let mut buffer = &buffer.as_slice()[*write_version..];
1047                                            decode_varint(&mut buffer).expect("already verified")
1048                                        },
1049                                        txn_signature: txn_signature_offset
1050                                            .map(|offset| &buffer.as_slice()[offset..offset + 64]),
1051                                    },
1052                                    MessageAccount::Prost { .. } => unreachable!(),
1053                                })
1054                                .collect(),
1055                            entries_count: block_meta.entries_count,
1056                            entries: if *entries {
1057                                message
1058                                    .entries
1059                                    .iter()
1060                                    .map(|entry| match entry.as_ref() {
1061                                        MessageEntry::Limited { buffer, range, .. } => {
1062                                            &buffer.as_slice()[range.start..range.end]
1063                                        }
1064                                        MessageEntry::Prost { .. } => unreachable!(),
1065                                    })
1066                                    .collect()
1067                            } else {
1068                                vec![]
1069                            },
1070                        }),
1071                        created_at,
1072                    }
1073                    .encode(buf)
1074                }
1075                MessageBlockCreatedAt::Prost(created_at) => {
1076                    let block_meta = match message.block_meta.as_ref() {
1077                        MessageBlockMeta::Limited { .. } => unreachable!(),
1078                        MessageBlockMeta::Prost { block_meta, .. } => block_meta,
1079                    };
1080
1081                    SubscribeUpdateMessageProst {
1082                        filters: &self.filters,
1083                        update: UpdateOneof::Block(SubscribeUpdateBlock {
1084                            slot: block_meta.slot,
1085                            blockhash: block_meta.blockhash.clone(),
1086                            rewards: block_meta.rewards.clone(),
1087                            block_time: block_meta.block_time,
1088                            block_height: block_meta.block_height,
1089                            parent_slot: block_meta.parent_slot,
1090                            parent_blockhash: block_meta.parent_blockhash.clone(),
1091                            executed_transaction_count: block_meta.executed_transaction_count,
1092                            transactions: transactions
1093                                .iter()
1094                                .map(|idx| match message.transactions[*idx].as_ref() {
1095                                    MessageTransaction::Limited { .. } => unreachable!(),
1096                                    MessageTransaction::Prost { transaction, .. } => {
1097                                        transaction.clone()
1098                                    }
1099                                })
1100                                .collect(),
1101                            updated_account_count: message.accounts.len() as u64,
1102                            accounts: accounts
1103                                .iter()
1104                                .map(|idx| match message.accounts[*idx].as_ref() {
1105                                    MessageAccount::Limited { .. } => unreachable!(),
1106                                    MessageAccount::Prost { account, .. } => {
1107                                        SubscribeUpdateAccountInfo {
1108                                            pubkey: account.pubkey.clone(),
1109                                            lamports: account.lamports,
1110                                            owner: account.owner.clone(),
1111                                            executable: account.executable,
1112                                            rent_epoch: account.rent_epoch,
1113                                            data: data_slices.get_slice(&account.data).into_owned(),
1114                                            write_version: account.write_version,
1115                                            txn_signature: account.txn_signature.clone(),
1116                                        }
1117                                    }
1118                                })
1119                                .collect(),
1120                            entries_count: block_meta.entries_count,
1121                            entries: if *entries {
1122                                message
1123                                    .entries
1124                                    .iter()
1125                                    .map(|entry| match entry.as_ref() {
1126                                        MessageEntry::Limited { .. } => unreachable!(),
1127                                        MessageEntry::Prost { entry, .. } => entry.clone(),
1128                                    })
1129                                    .collect()
1130                            } else {
1131                                vec![]
1132                            },
1133                        }),
1134                        created_at,
1135                    }
1136                    .encode(buf)
1137                }
1138            },
1139        }
1140        .expect("valid encode");
1141    }
1142}
1143
1144pub type FilteredUpdateFilters<'a> = SmallVec<[&'a str; 8]>;
1145
1146#[derive(Debug, Clone)]
1147pub enum FilteredUpdateType<'a> {
1148    Slot {
1149        message: &'a MessageSlot,
1150    },
1151    Account {
1152        message: &'a MessageAccount,
1153        data_slices: &'a FilterAccountDataSlices,
1154    },
1155    Transaction {
1156        message: &'a MessageTransaction,
1157    },
1158    TransactionStatus {
1159        message: &'a MessageTransaction,
1160    },
1161    Entry {
1162        message: &'a MessageEntry,
1163    },
1164    BlockMeta {
1165        message: &'a MessageBlockMeta,
1166    },
1167    Block {
1168        message: &'a MessageBlock,
1169        accounts: Vec<usize>,
1170        transactions: Vec<usize>,
1171        entries: bool,
1172        data_slices: &'a FilterAccountDataSlices,
1173    },
1174}
1175
1176impl<'a> From<MessageRef<'a>> for FilteredUpdateType<'a> {
1177    fn from(value: MessageRef<'a>) -> Self {
1178        match value {
1179            MessageRef::Slot(message) => Self::Slot { message },
1180            MessageRef::Account(message) => Self::Account {
1181                message,
1182                data_slices: FilterAccountDataSlices::empty(),
1183            },
1184            MessageRef::Transaction(message) => Self::Transaction { message },
1185            MessageRef::Entry(message) => Self::Entry { message },
1186            MessageRef::BlockMeta(message) => Self::BlockMeta { message },
1187            MessageRef::Block(message) => Self::Block {
1188                message,
1189                accounts: (0..message.accounts.len()).collect(),
1190                transactions: (0..message.transactions.len()).collect(),
1191                entries: true,
1192                data_slices: FilterAccountDataSlices::empty(),
1193            },
1194        }
1195    }
1196}