1use {
2 crate::{
3 config::{
4 ConfigFilter, ConfigFilterAccounts, ConfigFilterAccountsDataSlice,
5 ConfigFilterAccountsFilter, ConfigFilterAccountsFilterLamports, ConfigFilterBlocks,
6 ConfigFilterCommitment, ConfigFilterSlots, ConfigFilterTransactions, MAX_DATA_SIZE,
7 MAX_FILTERS,
8 },
9 message::{
10 Message, MessageAccount, MessageBlock, MessageBlockCreatedAt, MessageBlockMeta,
11 MessageEntry, MessageRef, MessageSlot, MessageTransaction,
12 },
13 protobuf::encode::{
14 SubscribeUpdateMessageLimited, SubscribeUpdateMessageProst, UpdateOneofLimitedEncode,
15 UpdateOneofLimitedEncodeAccount, UpdateOneofLimitedEncodeAccountInner,
16 UpdateOneofLimitedEncodeBlock, UpdateOneofLimitedEncodeTransactionStatus,
17 },
18 },
19 arrayvec::ArrayVec,
20 prost::{Message as _, bytes::BufMut, encoding::decode_varint},
21 richat_proto::geyser::{
22 SlotStatus, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock,
23 SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionStatus,
24 subscribe_update::UpdateOneof,
25 },
26 smallvec::{SmallVec, smallvec_inline},
27 solana_account::ReadableAccount,
28 solana_commitment_config::CommitmentLevel,
29 solana_pubkey::Pubkey,
30 solana_signature::Signature,
31 spl_token_2022_interface::{
32 generic_token_account::GenericTokenAccount, state::Account as TokenAccount,
33 },
34 std::{
35 borrow::{Borrow, Cow},
36 collections::{HashMap, HashSet},
37 ops::{Not, Range},
38 sync::Arc,
39 },
40};
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43struct FilterName(Arc<String>);
44
45impl AsRef<str> for FilterName {
46 #[inline]
47 fn as_ref(&self) -> &str {
48 &self.0
49 }
50}
51
52impl Borrow<str> for FilterName {
53 #[inline]
54 fn borrow(&self) -> &str {
55 &self.0[..]
56 }
57}
58
59#[derive(Debug, Default)]
60struct FilterNames {
61 names: HashSet<FilterName>,
62}
63
64impl FilterNames {
65 fn get(&mut self, name: &str) -> FilterName {
66 match self.names.get(name) {
67 Some(name) => name.clone(),
68 None => {
69 let name = FilterName(Arc::new(name.into()));
70 self.names.insert(name.clone());
71 name
72 }
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78pub struct Filter {
79 slots: FilterSlots,
80 accounts: FilterAccounts,
81 accounts_data_slices: FilterAccountDataSlices,
82 transactions: FilterTransactions,
83 transactions_status: FilterTransactions,
84 entries: FilterEntries,
85 blocks_meta: FilterBlocksMeta,
86 blocks: FilterBlocks,
87 commitment: ConfigFilterCommitment,
88}
89
90impl Default for Filter {
91 fn default() -> Self {
92 Self {
93 slots: FilterSlots::default(),
94 accounts: FilterAccounts::default(),
95 accounts_data_slices: FilterAccountDataSlices::default(),
96 transactions: FilterTransactions {
97 filter_type: FilterTransactionsType::Transaction,
98 filters: HashMap::default(),
99 },
100 transactions_status: FilterTransactions {
101 filter_type: FilterTransactionsType::TransactionStatus,
102 filters: HashMap::default(),
103 },
104 entries: FilterEntries::default(),
105 blocks_meta: FilterBlocksMeta::default(),
106 blocks: FilterBlocks::default(),
107 commitment: ConfigFilterCommitment::default(),
108 }
109 }
110}
111
112impl Filter {
113 pub fn new(config: &ConfigFilter) -> Self {
114 let mut names = FilterNames::default();
115 Self {
116 slots: FilterSlots::new(&mut names, &config.slots),
117 accounts: FilterAccounts::new(&mut names, &config.accounts),
118 accounts_data_slices: FilterAccountDataSlices::new(&config.accounts_data_slice),
119 transactions: FilterTransactions::new(
120 &mut names,
121 &config.transactions,
122 FilterTransactionsType::Transaction,
123 ),
124 transactions_status: FilterTransactions::new(
125 &mut names,
126 &config.transactions_status,
127 FilterTransactionsType::TransactionStatus,
128 ),
129 entries: FilterEntries::new(&mut names, &config.entries),
130 blocks_meta: FilterBlocksMeta::new(&mut names, &config.blocks_meta),
131 blocks: FilterBlocks::new(&mut names, &config.blocks),
132 commitment: config
133 .commitment
134 .unwrap_or(ConfigFilterCommitment::Processed),
135 }
136 }
137
138 pub fn contains_blocks(&self) -> bool {
139 !self.blocks.filters.is_empty()
140 }
141
142 pub const fn commitment(&self) -> ConfigFilterCommitment {
143 self.commitment
144 }
145
146 pub fn get_updates<'a>(
147 &'a self,
148 message: &'a Message,
149 commitment: CommitmentLevel,
150 ) -> SmallVec<[FilteredUpdate<'a>; 2]> {
151 self.get_updates_ref(message.into(), commitment)
152 }
153
154 pub fn get_updates_ref<'a>(
155 &'a self,
156 message: MessageRef<'a>,
157 commitment: CommitmentLevel,
158 ) -> SmallVec<[FilteredUpdate<'a>; 2]> {
159 let mut vec = SmallVec::<[FilteredUpdate; 2]>::new();
160 match message {
161 MessageRef::Slot(message) => {
162 if let Some(update) = self.slots.get_update(message, commitment) {
163 vec.push(update);
164 }
165 }
166 MessageRef::Account(message) => {
167 if let Some(update) = self
168 .accounts
169 .get_update(message, &self.accounts_data_slices)
170 {
171 vec.push(update);
172 }
173 }
174 MessageRef::Transaction(message) => {
175 if let Some(update) = self.transactions.get_update(message) {
176 vec.push(update);
177 }
178 if let Some(update) = self.transactions_status.get_update(message) {
179 vec.push(update);
180 }
181 }
182 MessageRef::Entry(message) => {
183 if let Some(update) = self.entries.get_update(message) {
184 vec.push(update);
185 }
186 }
187 MessageRef::BlockMeta(message) => {
188 if let Some(update) = self.blocks_meta.get_update(message) {
189 vec.push(update);
190 }
191 }
192 MessageRef::Block(message) => {
193 for update in self.blocks.get_updates(message, &self.accounts_data_slices) {
194 vec.push(update);
195 }
196 }
197 }
198 vec
199 }
200}
201
202#[derive(Debug, Default, Clone, Copy)]
203struct FilterSlotsInner {
204 filter_by_commitment: bool,
205 interslot_updates: bool,
206}
207
208impl FilterSlotsInner {
209 fn new(filter: ConfigFilterSlots) -> Self {
210 Self {
211 filter_by_commitment: filter.filter_by_commitment.unwrap_or_default(),
212 interslot_updates: filter.interslot_updates.unwrap_or_default(),
213 }
214 }
215}
216
217#[derive(Debug, Default, Clone)]
218struct FilterSlots {
219 filters: HashMap<FilterName, FilterSlotsInner>,
220}
221
222impl FilterSlots {
223 fn new(names: &mut FilterNames, configs: &HashMap<String, ConfigFilterSlots>) -> Self {
224 Self {
225 filters: configs
226 .iter()
227 .map(|(name, filter)| (names.get(name), FilterSlotsInner::new(*filter)))
228 .collect(),
229 }
230 }
231
232 fn get_update<'a>(
233 &'a self,
234 message: &'a MessageSlot,
235 commitment: CommitmentLevel,
236 ) -> Option<FilteredUpdate<'a>> {
237 let msg_status = message.status();
238
239 let filters = self
240 .filters
241 .iter()
242 .filter_map(|(name, inner)| {
243 if (!inner.filter_by_commitment
244 || ((msg_status == SlotStatus::SlotProcessed
245 && commitment == CommitmentLevel::Processed)
246 || (msg_status == SlotStatus::SlotConfirmed
247 && commitment == CommitmentLevel::Confirmed)
248 || (msg_status == SlotStatus::SlotFinalized
249 && commitment == CommitmentLevel::Finalized)))
250 && (inner.interslot_updates
251 || matches!(
252 msg_status,
253 SlotStatus::SlotProcessed
254 | SlotStatus::SlotConfirmed
255 | SlotStatus::SlotFinalized
256 ))
257 {
258 Some(name.as_ref())
259 } else {
260 None
261 }
262 })
263 .collect::<FilteredUpdateFilters>();
264
265 filters.is_empty().not().then(|| FilteredUpdate {
266 filters,
267 filtered_update: FilteredUpdateType::Slot { message },
268 })
269 }
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273enum FilterAccountsLamports {
274 Eq(u64),
275 Ne(u64),
276 Lt(u64),
277 Gt(u64),
278}
279
280impl From<ConfigFilterAccountsFilterLamports> for FilterAccountsLamports {
281 fn from(cmp: ConfigFilterAccountsFilterLamports) -> Self {
282 match cmp {
283 ConfigFilterAccountsFilterLamports::Eq(value) => Self::Eq(value),
284 ConfigFilterAccountsFilterLamports::Ne(value) => Self::Ne(value),
285 ConfigFilterAccountsFilterLamports::Lt(value) => Self::Lt(value),
286 ConfigFilterAccountsFilterLamports::Gt(value) => Self::Gt(value),
287 }
288 }
289}
290
291impl FilterAccountsLamports {
292 const fn is_match(self, lamports: u64) -> bool {
293 match self {
294 Self::Eq(value) => value == lamports,
295 Self::Ne(value) => value != lamports,
296 Self::Lt(value) => value > lamports,
297 Self::Gt(value) => value < lamports,
298 }
299 }
300}
301
302#[derive(Debug, Default, Clone)]
303struct FilterAccountsState {
304 memcmp: ArrayVec<(usize, ArrayVec<u8, MAX_DATA_SIZE>), MAX_FILTERS>,
305 datasize: Option<usize>,
306 token_account_state: bool,
307 lamports: ArrayVec<FilterAccountsLamports, MAX_FILTERS>,
308}
309
310impl FilterAccountsState {
311 fn new(filters: &[ConfigFilterAccountsFilter]) -> Self {
312 let mut me = Self::default();
313 for filter in filters {
314 match filter {
315 ConfigFilterAccountsFilter::Memcmp { offset, data } => {
316 me.memcmp.push((*offset, data.iter().cloned().collect()));
317 }
318 ConfigFilterAccountsFilter::DataSize(datasize) => {
319 me.datasize = Some(*datasize as usize);
320 }
321 ConfigFilterAccountsFilter::TokenAccountState => {
322 me.token_account_state = true;
323 }
324 ConfigFilterAccountsFilter::Lamports(value) => {
325 me.lamports.push((*value).into());
326 }
327 }
328 }
329 me
330 }
331
332 fn is_match(&self, lamports: u64, data: &[u8]) -> bool {
333 if matches!(self.datasize, Some(datasize) if data.len() != datasize) {
334 return false;
335 }
336 if self.token_account_state && !TokenAccount::valid_account_data(data) {
337 return false;
338 }
339 if self.lamports.iter().any(|f| !f.is_match(lamports)) {
340 return false;
341 }
342 for (offset, bytes) in self.memcmp.iter() {
343 if data.len() < *offset + bytes.len() {
344 return false;
345 }
346 let data = &data[*offset..*offset + bytes.len()];
347 if data != bytes.as_slice() {
348 return false;
349 }
350 }
351 true
352 }
353}
354
355#[derive(Debug, Clone)]
356struct FilterAccountsInner {
357 account: HashSet<Pubkey>,
358 owner: HashSet<Pubkey>,
359 filters: Option<FilterAccountsState>,
360 nonempty_txn_signature: Option<bool>,
361}
362
363#[derive(Debug, Default, Clone)]
364struct FilterAccounts {
365 filters: HashMap<FilterName, FilterAccountsInner>,
366}
367
368impl FilterAccounts {
369 fn new(names: &mut FilterNames, configs: &HashMap<String, ConfigFilterAccounts>) -> Self {
370 let mut me = Self::default();
371 for (name, filter) in configs {
372 me.filters.insert(
373 names.get(name),
374 FilterAccountsInner {
375 account: filter.account.iter().copied().collect(),
376 owner: filter.owner.iter().copied().collect(),
377 filters: if filter.filters.is_empty() {
378 None
379 } else {
380 Some(FilterAccountsState::new(&filter.filters))
381 },
382 nonempty_txn_signature: filter.nonempty_txn_signature,
383 },
384 );
385 }
386 me
387 }
388
389 fn get_update<'a>(
390 &'a self,
391 message: &'a MessageAccount,
392 data_slices: &'a FilterAccountDataSlices,
393 ) -> Option<FilteredUpdate<'a>> {
394 let msg_pubkey = message.pubkey();
395 let msg_owner = message.owner();
396 let msg_lamports = message.lamports();
397 let msg_data = message.data();
398 let msg_nonempty_txn_signature = message.nonempty_txn_signature();
399
400 let filters = self
401 .filters
402 .iter()
403 .filter_map(|(name, filter)| {
404 if !filter.account.is_empty() && !filter.account.contains(msg_pubkey) {
405 return None;
406 }
407
408 if !filter.owner.is_empty() && !filter.owner.contains(msg_owner) {
409 return None;
410 }
411
412 if let Some(filters) = &filter.filters {
413 if !filters.is_match(msg_lamports, msg_data) {
414 return None;
415 }
416 }
417
418 if let Some(nonempty_txn_signature) = filter.nonempty_txn_signature {
419 if nonempty_txn_signature != msg_nonempty_txn_signature {
420 return None;
421 }
422 }
423
424 Some(name.as_ref())
425 })
426 .collect::<FilteredUpdateFilters>();
427
428 filters.is_empty().not().then(|| FilteredUpdate {
429 filters,
430 filtered_update: FilteredUpdateType::Account {
431 message,
432 data_slices,
433 },
434 })
435 }
436}
437
438#[derive(Debug, Default, Clone)]
439pub struct FilterAccountDataSlices(SmallVec<[Range<usize>; 4]>);
440
441impl FilterAccountDataSlices {
442 fn empty() -> &'static FilterAccountDataSlices {
443 static EMPTY: FilterAccountDataSlices = FilterAccountDataSlices(SmallVec::new_const());
444 &EMPTY
445 }
446
447 fn new(data_slices: &[ConfigFilterAccountsDataSlice]) -> Self {
448 let mut vec = SmallVec::new();
449 for data_slice in data_slices {
450 vec.push(Range {
451 start: data_slice.offset as usize,
452 end: (data_slice.offset + data_slice.length) as usize,
453 })
454 }
455 Self(vec)
456 }
457
458 pub fn is_empty(&self) -> bool {
459 self.0.is_empty()
460 }
461
462 pub fn get_slice<'a>(&self, source: &'a [u8]) -> Cow<'a, [u8]> {
463 if self.0.is_empty() {
464 Cow::Borrowed(source)
465 } else if self.0.len() == 1 {
466 if source.len() >= self.0[0].end {
467 Cow::Borrowed(&source[self.0[0].start..self.0[0].end])
468 } else {
469 Cow::Borrowed(&[])
470 }
471 } else {
472 let mut data = Vec::with_capacity(self.0.iter().map(|ds| ds.end - ds.start).sum());
473 for data_slice in self.0.iter() {
474 if source.len() >= data_slice.end {
475 data.extend_from_slice(&source[data_slice.start..data_slice.end]);
476 }
477 }
478 Cow::Owned(data)
479 }
480 }
481}
482
483#[derive(Debug, Clone, Copy, PartialEq, Eq)]
484enum FilterTransactionsType {
485 Transaction,
486 TransactionStatus,
487}
488
489#[derive(Debug, Clone)]
490struct FilterTransactionsInner {
491 vote: Option<bool>,
492 failed: Option<bool>,
493 signature: Option<Signature>,
494 account_include: HashSet<Pubkey>,
495 account_exclude: HashSet<Pubkey>,
496 account_required: HashSet<Pubkey>,
497}
498
499#[derive(Debug, Clone)]
500struct FilterTransactions {
501 filter_type: FilterTransactionsType,
502 filters: HashMap<FilterName, FilterTransactionsInner>,
503}
504
505impl FilterTransactions {
506 fn new(
507 names: &mut FilterNames,
508 configs: &HashMap<String, ConfigFilterTransactions>,
509 filter_type: FilterTransactionsType,
510 ) -> Self {
511 let mut filters = HashMap::new();
512 for (name, filter) in configs {
513 filters.insert(
514 names.get(name),
515 FilterTransactionsInner {
516 vote: filter.vote,
517 failed: filter.failed,
518 signature: filter.signature,
519 account_include: filter.account_include.iter().copied().collect(),
520 account_exclude: filter.account_exclude.iter().copied().collect(),
521 account_required: filter.account_required.iter().copied().collect(),
522 },
523 );
524 }
525 Self {
526 filter_type,
527 filters,
528 }
529 }
530
531 fn get_update<'a>(&'a self, message: &'a MessageTransaction) -> Option<FilteredUpdate<'a>> {
532 let msg_vote = message.vote();
533 let msg_failed = message.failed();
534 let msg_signature = message.signature_ref();
535 let msg_account_keys = message.account_keys();
536
537 let filters = self
538 .filters
539 .iter()
540 .filter_map(|(name, filter)| {
541 if let Some(is_vote) = filter.vote {
542 if is_vote != msg_vote {
543 return None;
544 }
545 }
546
547 if let Some(is_failed) = filter.failed {
548 if is_failed != msg_failed {
549 return None;
550 }
551 }
552
553 if let Some(signature) = &filter.signature {
554 if signature.as_ref() != msg_signature {
555 return None;
556 }
557 }
558
559 if !filter.account_include.is_empty()
560 && filter
561 .account_include
562 .intersection(msg_account_keys)
563 .next()
564 .is_none()
565 {
566 return None;
567 }
568
569 if !filter.account_exclude.is_empty()
570 && filter
571 .account_exclude
572 .intersection(msg_account_keys)
573 .next()
574 .is_some()
575 {
576 return None;
577 }
578
579 if !filter.account_required.is_empty()
580 && !filter.account_required.is_subset(msg_account_keys)
581 {
582 return None;
583 }
584
585 Some(name.as_ref())
586 })
587 .collect::<FilteredUpdateFilters>();
588
589 filters.is_empty().not().then(|| FilteredUpdate {
590 filters,
591 filtered_update: match self.filter_type {
592 FilterTransactionsType::Transaction => FilteredUpdateType::Transaction { message },
593 FilterTransactionsType::TransactionStatus => {
594 FilteredUpdateType::TransactionStatus { message }
595 }
596 },
597 })
598 }
599}
600
601#[derive(Debug, Default, Clone)]
602struct FilterEntries {
603 filters: Vec<FilterName>,
604}
605
606impl FilterEntries {
607 fn new(names: &mut FilterNames, configs: &HashSet<String>) -> Self {
608 Self {
609 filters: configs.iter().map(|name| names.get(name)).collect(),
610 }
611 }
612
613 fn get_update<'a>(&'a self, message: &'a MessageEntry) -> Option<FilteredUpdate<'a>> {
614 let filters = self
615 .filters
616 .iter()
617 .map(|f| f.as_ref())
618 .collect::<FilteredUpdateFilters>();
619
620 filters.is_empty().not().then(|| FilteredUpdate {
621 filters,
622 filtered_update: FilteredUpdateType::Entry { message },
623 })
624 }
625}
626
627#[derive(Debug, Default, Clone)]
628struct FilterBlocksMeta {
629 filters: Vec<FilterName>,
630}
631
632impl FilterBlocksMeta {
633 fn new(names: &mut FilterNames, configs: &HashSet<String>) -> Self {
634 Self {
635 filters: configs.iter().map(|name| names.get(name)).collect(),
636 }
637 }
638
639 fn get_update<'a>(&'a self, message: &'a MessageBlockMeta) -> Option<FilteredUpdate<'a>> {
640 let filters = self
641 .filters
642 .iter()
643 .map(|f| f.as_ref())
644 .collect::<FilteredUpdateFilters>();
645
646 filters.is_empty().not().then(|| FilteredUpdate {
647 filters,
648 filtered_update: FilteredUpdateType::BlockMeta { message },
649 })
650 }
651}
652
653#[derive(Debug, Clone)]
654struct FilterBlocksInner {
655 account_include: HashSet<Pubkey>,
656 include_transactions: Option<bool>,
657 include_accounts: Option<bool>,
658 include_entries: Option<bool>,
659}
660
661#[derive(Debug, Default, Clone)]
662struct FilterBlocks {
663 filters: HashMap<FilterName, FilterBlocksInner>,
664}
665
666impl FilterBlocks {
667 fn new(names: &mut FilterNames, configs: &HashMap<String, ConfigFilterBlocks>) -> Self {
668 let mut me = Self::default();
669 for (name, filter) in configs {
670 me.filters.insert(
671 names.get(name),
672 FilterBlocksInner {
673 account_include: filter.account_include.iter().copied().collect(),
674 include_transactions: filter.include_transactions,
675 include_accounts: filter.include_accounts,
676 include_entries: filter.include_entries,
677 },
678 );
679 }
680 me
681 }
682
683 fn get_updates<'a>(
684 &'a self,
685 message: &'a MessageBlock,
686 data_slices: &'a FilterAccountDataSlices,
687 ) -> impl Iterator<Item = FilteredUpdate<'a>> {
688 self.filters.iter().map(|(name, filter)| {
689 let accounts = if filter.include_accounts == Some(true) {
690 message
691 .accounts
692 .iter()
693 .enumerate()
694 .filter_map(|(idx, account)| {
695 if !filter.account_include.is_empty()
696 && !filter.account_include.contains(account.pubkey())
697 {
698 None
699 } else {
700 Some(idx)
701 }
702 })
703 .collect::<Vec<_>>()
704 } else {
705 vec![]
706 };
707
708 let transactions = if matches!(filter.include_transactions, None | Some(true)) {
709 message
710 .transactions
711 .iter()
712 .enumerate()
713 .filter_map(|(idx, tx)| {
714 if !filter.account_include.is_empty()
715 && filter
716 .account_include
717 .intersection(tx.account_keys())
718 .next()
719 .is_none()
720 {
721 None
722 } else {
723 Some(idx)
724 }
725 })
726 .collect::<Vec<_>>()
727 } else {
728 vec![]
729 };
730
731 FilteredUpdate {
732 filters: smallvec_inline![name.as_ref(); 8],
733 filtered_update: FilteredUpdateType::Block {
734 message,
735 accounts,
736 transactions,
737 entries: filter.include_entries == Some(true),
738 data_slices,
739 },
740 }
741 })
742 }
743}
744
745#[derive(Debug, Clone)]
746pub struct FilteredUpdate<'a> {
747 pub filters: FilteredUpdateFilters<'a>,
748 pub filtered_update: FilteredUpdateType<'a>,
749}
750
751impl FilteredUpdate<'_> {
752 pub fn encode_to_vec(&self) -> Vec<u8> {
753 let mut bytes = vec![];
754 self.encode(&mut bytes);
755 bytes
756 }
757
758 pub fn encode(&self, buf: &mut impl BufMut) {
759 match &self.filtered_update {
760 FilteredUpdateType::Slot { message } => match message {
761 MessageSlot::Limited {
762 created_at,
763 buffer,
764 range,
765 ..
766 } => SubscribeUpdateMessageLimited {
767 filters: &self.filters,
768 update: UpdateOneofLimitedEncode::Slot(
769 &buffer.as_slice()[range.start..range.end],
770 ),
771 created_at: *created_at,
772 }
773 .encode(buf),
774 MessageSlot::Prost {
775 slot,
776 parent,
777 status,
778 dead_error,
779 created_at,
780 ..
781 } => SubscribeUpdateMessageProst {
782 filters: &self.filters,
783 update: UpdateOneof::Slot(SubscribeUpdateSlot {
784 slot: *slot,
785 parent: *parent,
786 status: *status as i32,
787 dead_error: dead_error.clone(),
788 }),
789 created_at: *created_at,
790 }
791 .encode(buf),
792 },
793 FilteredUpdateType::Account {
794 message,
795 data_slices,
796 } => match message {
797 MessageAccount::Limited {
798 pubkey,
799 owner,
800 lamports,
801 executable,
802 rent_epoch,
803 data,
804 txn_signature_offset,
805 write_version,
806 slot,
807 is_startup,
808 created_at,
809 buffer,
810 account_offset: _,
811 range,
812 } => SubscribeUpdateMessageLimited {
813 filters: &self.filters,
814 update: UpdateOneofLimitedEncode::Account(if data_slices.is_empty() {
815 UpdateOneofLimitedEncodeAccount::Slice(
816 &buffer.as_slice()[range.start..range.end],
817 )
818 } else {
819 UpdateOneofLimitedEncodeAccount::Fields {
820 account: UpdateOneofLimitedEncodeAccountInner {
821 pubkey,
822 lamports: *lamports,
823 owner,
824 executable: *executable,
825 rent_epoch: *rent_epoch,
826 data: data_slices
827 .get_slice(&buffer.as_slice()[data.start..data.end]),
828 write_version: {
829 let mut buffer = &buffer.as_slice()[*write_version..];
830 decode_varint(&mut buffer).expect("already verified")
831 },
832 txn_signature: txn_signature_offset
833 .map(|offset| &buffer.as_slice()[offset..offset + 64]),
834 },
835 slot: *slot,
836 is_startup: *is_startup,
837 }
838 }),
839 created_at: *created_at,
840 }
841 .encode(buf),
842 MessageAccount::Prost {
843 account,
844 slot,
845 is_startup,
846 created_at,
847 ..
848 } => SubscribeUpdateMessageProst {
849 filters: &self.filters,
850 update: UpdateOneof::Account(SubscribeUpdateAccount {
851 account: Some(SubscribeUpdateAccountInfo {
852 pubkey: account.pubkey.clone(),
853 lamports: account.lamports,
854 owner: account.owner.clone(),
855 executable: account.executable,
856 rent_epoch: account.rent_epoch,
857 data: data_slices.get_slice(&account.data).into_owned(),
858 write_version: account.write_version,
859 txn_signature: account.txn_signature.clone(),
860 }),
861 slot: *slot,
862 is_startup: *is_startup,
863 }),
864 created_at: *created_at,
865 }
866 .encode(buf),
867 },
868 FilteredUpdateType::Transaction { message } => match message {
869 MessageTransaction::Limited {
870 created_at,
871 buffer,
872 range,
873 ..
874 } => SubscribeUpdateMessageLimited {
875 filters: &self.filters,
876 update: UpdateOneofLimitedEncode::Transaction(
877 &buffer.as_slice()[range.start..range.end],
878 ),
879 created_at: *created_at,
880 }
881 .encode(buf),
882 MessageTransaction::Prost {
883 transaction,
884 slot,
885 created_at,
886 ..
887 } => SubscribeUpdateMessageProst {
888 filters: &self.filters,
889 update: UpdateOneof::Transaction(SubscribeUpdateTransaction {
890 transaction: Some(transaction.clone()),
891 slot: *slot,
892 }),
893 created_at: *created_at,
894 }
895 .encode(buf),
896 },
897 FilteredUpdateType::TransactionStatus { message } => match message {
898 MessageTransaction::Limited {
899 error,
900 is_vote,
901 index,
902 slot,
903 created_at,
904 ..
905 } => SubscribeUpdateMessageLimited {
906 filters: &self.filters,
907 update: UpdateOneofLimitedEncode::TransactionStatus(
908 UpdateOneofLimitedEncodeTransactionStatus {
909 slot: *slot,
910 signature: message.signature_ref(),
911 is_vote: *is_vote,
912 index: *index,
913 err: error.clone(),
914 },
915 ),
916 created_at: *created_at,
917 }
918 .encode(buf),
919 MessageTransaction::Prost {
920 error,
921 transaction,
922 slot,
923 created_at,
924 ..
925 } => SubscribeUpdateMessageProst {
926 filters: &self.filters,
927 update: UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus {
928 slot: *slot,
929 signature: message.signature_ref().to_vec(),
930 is_vote: transaction.is_vote,
931 index: transaction.index,
932 err: error.clone(),
933 }),
934 created_at: *created_at,
935 }
936 .encode(buf),
937 },
938 FilteredUpdateType::Entry { message } => match message {
939 MessageEntry::Limited {
940 created_at,
941 buffer,
942 range,
943 ..
944 } => SubscribeUpdateMessageLimited {
945 filters: &self.filters,
946 update: UpdateOneofLimitedEncode::Entry(
947 &buffer.as_slice()[range.start..range.end],
948 ),
949 created_at: *created_at,
950 }
951 .encode(buf),
952 MessageEntry::Prost {
953 entry, created_at, ..
954 } => SubscribeUpdateMessageProst {
955 filters: &self.filters,
956 update: UpdateOneof::Entry(entry.clone()),
957 created_at: *created_at,
958 }
959 .encode(buf),
960 },
961 FilteredUpdateType::BlockMeta { message } => match message {
962 MessageBlockMeta::Limited {
963 created_at,
964 buffer,
965 range,
966 ..
967 } => SubscribeUpdateMessageLimited {
968 filters: &self.filters,
969 update: UpdateOneofLimitedEncode::BlockMeta(
970 &buffer.as_slice()[range.start..range.end],
971 ),
972 created_at: *created_at,
973 }
974 .encode(buf),
975 MessageBlockMeta::Prost {
976 block_meta,
977 created_at,
978 ..
979 } => SubscribeUpdateMessageProst {
980 filters: &self.filters,
981 update: UpdateOneof::BlockMeta(block_meta.clone()),
982 created_at: *created_at,
983 }
984 .encode(buf),
985 },
986 FilteredUpdateType::Block {
987 message,
988 accounts,
989 transactions,
990 entries,
991 data_slices,
992 } => match message.created_at {
993 MessageBlockCreatedAt::Limited(created_at) => {
994 let block_meta = match message.block_meta.as_ref() {
995 MessageBlockMeta::Limited { block_meta, .. } => block_meta,
996 MessageBlockMeta::Prost { .. } => unreachable!(),
997 };
998
999 SubscribeUpdateMessageLimited {
1000 filters: &self.filters,
1001 update: UpdateOneofLimitedEncode::Block(UpdateOneofLimitedEncodeBlock {
1002 slot: block_meta.slot,
1003 blockhash: block_meta.blockhash.as_str(),
1004 rewards: block_meta.rewards.clone(),
1005 block_time: block_meta.block_time,
1006 block_height: block_meta.block_height,
1007 parent_slot: block_meta.parent_slot,
1008 parent_blockhash: block_meta.parent_blockhash.as_str(),
1009 executed_transaction_count: block_meta.executed_transaction_count,
1010 transactions: transactions
1011 .iter()
1012 .map(|idx| match message.transactions[*idx].as_ref() {
1013 MessageTransaction::Limited {
1014 transaction_range,
1015 buffer,
1016 ..
1017 } => &buffer.as_slice()
1018 [transaction_range.start..transaction_range.end],
1019 MessageTransaction::Prost { .. } => unreachable!(),
1020 })
1021 .collect(),
1022 updated_account_count: message.accounts.len() as u64,
1023 accounts: accounts
1024 .iter()
1025 .map(|idx| match message.accounts[*idx].as_ref() {
1026 MessageAccount::Limited {
1027 pubkey,
1028 owner,
1029 lamports,
1030 executable,
1031 rent_epoch,
1032 data,
1033 txn_signature_offset,
1034 write_version,
1035 buffer,
1036 ..
1037 } => UpdateOneofLimitedEncodeAccountInner {
1038 pubkey,
1039 lamports: *lamports,
1040 owner,
1041 executable: *executable,
1042 rent_epoch: *rent_epoch,
1043 data: data_slices
1044 .get_slice(&buffer.as_slice()[data.start..data.end]),
1045 write_version: {
1046 let mut buffer = &buffer.as_slice()[*write_version..];
1047 decode_varint(&mut buffer).expect("already verified")
1048 },
1049 txn_signature: txn_signature_offset
1050 .map(|offset| &buffer.as_slice()[offset..offset + 64]),
1051 },
1052 MessageAccount::Prost { .. } => unreachable!(),
1053 })
1054 .collect(),
1055 entries_count: block_meta.entries_count,
1056 entries: if *entries {
1057 message
1058 .entries
1059 .iter()
1060 .map(|entry| match entry.as_ref() {
1061 MessageEntry::Limited { buffer, range, .. } => {
1062 &buffer.as_slice()[range.start..range.end]
1063 }
1064 MessageEntry::Prost { .. } => unreachable!(),
1065 })
1066 .collect()
1067 } else {
1068 vec![]
1069 },
1070 }),
1071 created_at,
1072 }
1073 .encode(buf)
1074 }
1075 MessageBlockCreatedAt::Prost(created_at) => {
1076 let block_meta = match message.block_meta.as_ref() {
1077 MessageBlockMeta::Limited { .. } => unreachable!(),
1078 MessageBlockMeta::Prost { block_meta, .. } => block_meta,
1079 };
1080
1081 SubscribeUpdateMessageProst {
1082 filters: &self.filters,
1083 update: UpdateOneof::Block(SubscribeUpdateBlock {
1084 slot: block_meta.slot,
1085 blockhash: block_meta.blockhash.clone(),
1086 rewards: block_meta.rewards.clone(),
1087 block_time: block_meta.block_time,
1088 block_height: block_meta.block_height,
1089 parent_slot: block_meta.parent_slot,
1090 parent_blockhash: block_meta.parent_blockhash.clone(),
1091 executed_transaction_count: block_meta.executed_transaction_count,
1092 transactions: transactions
1093 .iter()
1094 .map(|idx| match message.transactions[*idx].as_ref() {
1095 MessageTransaction::Limited { .. } => unreachable!(),
1096 MessageTransaction::Prost { transaction, .. } => {
1097 transaction.clone()
1098 }
1099 })
1100 .collect(),
1101 updated_account_count: message.accounts.len() as u64,
1102 accounts: accounts
1103 .iter()
1104 .map(|idx| match message.accounts[*idx].as_ref() {
1105 MessageAccount::Limited { .. } => unreachable!(),
1106 MessageAccount::Prost { account, .. } => {
1107 SubscribeUpdateAccountInfo {
1108 pubkey: account.pubkey.clone(),
1109 lamports: account.lamports,
1110 owner: account.owner.clone(),
1111 executable: account.executable,
1112 rent_epoch: account.rent_epoch,
1113 data: data_slices.get_slice(&account.data).into_owned(),
1114 write_version: account.write_version,
1115 txn_signature: account.txn_signature.clone(),
1116 }
1117 }
1118 })
1119 .collect(),
1120 entries_count: block_meta.entries_count,
1121 entries: if *entries {
1122 message
1123 .entries
1124 .iter()
1125 .map(|entry| match entry.as_ref() {
1126 MessageEntry::Limited { .. } => unreachable!(),
1127 MessageEntry::Prost { entry, .. } => entry.clone(),
1128 })
1129 .collect()
1130 } else {
1131 vec![]
1132 },
1133 }),
1134 created_at,
1135 }
1136 .encode(buf)
1137 }
1138 },
1139 }
1140 .expect("valid encode");
1141 }
1142}
1143
1144pub type FilteredUpdateFilters<'a> = SmallVec<[&'a str; 8]>;
1145
1146#[derive(Debug, Clone)]
1147pub enum FilteredUpdateType<'a> {
1148 Slot {
1149 message: &'a MessageSlot,
1150 },
1151 Account {
1152 message: &'a MessageAccount,
1153 data_slices: &'a FilterAccountDataSlices,
1154 },
1155 Transaction {
1156 message: &'a MessageTransaction,
1157 },
1158 TransactionStatus {
1159 message: &'a MessageTransaction,
1160 },
1161 Entry {
1162 message: &'a MessageEntry,
1163 },
1164 BlockMeta {
1165 message: &'a MessageBlockMeta,
1166 },
1167 Block {
1168 message: &'a MessageBlock,
1169 accounts: Vec<usize>,
1170 transactions: Vec<usize>,
1171 entries: bool,
1172 data_slices: &'a FilterAccountDataSlices,
1173 },
1174}
1175
1176impl<'a> From<MessageRef<'a>> for FilteredUpdateType<'a> {
1177 fn from(value: MessageRef<'a>) -> Self {
1178 match value {
1179 MessageRef::Slot(message) => Self::Slot { message },
1180 MessageRef::Account(message) => Self::Account {
1181 message,
1182 data_slices: FilterAccountDataSlices::empty(),
1183 },
1184 MessageRef::Transaction(message) => Self::Transaction { message },
1185 MessageRef::Entry(message) => Self::Entry { message },
1186 MessageRef::BlockMeta(message) => Self::BlockMeta { message },
1187 MessageRef::Block(message) => Self::Block {
1188 message,
1189 accounts: (0..message.accounts.len()).collect(),
1190 transactions: (0..message.transactions.len()).collect(),
1191 entries: true,
1192 data_slices: FilterAccountDataSlices::empty(),
1193 },
1194 }
1195 }
1196}