1#![deny(
2 clippy::disallowed_methods,
3 clippy::suspicious,
4 clippy::style,
5 clippy::clone_on_ref_ptr,
6 missing_debug_implementations,
7 missing_copy_implementations
8)]
9#![warn(clippy::pedantic, missing_docs)]
10#![allow(clippy::module_name_repetitions)]
11
12use std::{
19 borrow::Cow,
20 collections::{HashMap, HashSet},
21 fmt::{self, Debug},
22 future::Future,
23 str::FromStr,
24 sync::Arc,
25};
26
27use borsh::{BorshDeserialize, BorshSerialize};
28use serde::Deserialize;
29use yellowstone_grpc_proto::geyser::{
30 self, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
31 SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
32 SubscribeRequestFilterTransactions, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
33 SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdateSlot,
34 SubscribeUpdateTransaction,
35};
36
37pub extern crate bs58;
38
39#[cfg(feature = "proto")]
40pub extern crate yellowstone_vixen_proto;
41
42pub mod instruction;
43pub mod log_messages;
44
45#[cfg(feature = "proto")]
46pub mod proto;
47
48type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
49
50#[derive(Debug)]
52pub enum ParseError {
53 Filtered,
57 DiscriminatorNotFound(String),
59 Other(BoxedError),
61}
62
63impl<T: Into<BoxedError>> From<T> for ParseError {
64 #[inline]
65 fn from(value: T) -> Self { Self::Other(value.into()) }
66}
67
68pub type ParseResult<T> = Result<T, ParseError>;
70
71pub type AccountUpdate = SubscribeUpdateAccount;
73pub type AccountUpdateInfo = SubscribeUpdateAccountInfo;
75pub type TransactionUpdate = SubscribeUpdateTransaction;
77pub type BlockMetaUpdate = SubscribeUpdateBlockMeta;
79pub type BlockUpdate = SubscribeUpdateBlock;
81pub type SlotUpdate = SubscribeUpdateSlot;
83
84#[derive(Debug)]
90pub struct InstructionUpdateOutput<T> {
91 pub parsed_ix: T,
93 pub shared_data: Arc<instruction::InstructionShared>,
95}
96
97pub trait Parser {
101 type Input;
103
104 #[cfg(feature = "proto")]
108 type Output: ::prost::Message;
109
110 #[cfg(not(feature = "proto"))]
112 type Output;
113
114 fn id(&self) -> Cow<'static, str>;
124
125 fn prefilter(&self) -> Prefilter;
128
129 fn parse(&self, value: &Self::Input) -> impl Future<Output = ParseResult<Self::Output>> + Send;
131}
132
133pub trait ProgramParser: Parser {
135 fn program_id(&self) -> Pubkey;
137}
138
139pub trait ParserId {
141 fn id(&self) -> Cow<'static, str>;
143}
144
145impl ParserId for std::convert::Infallible {
146 #[inline]
147 fn id(&self) -> Cow<'static, str> { match *self {} }
148}
149
150impl<T: Parser> ParserId for T {
151 #[inline]
152 fn id(&self) -> Cow<'static, str> { Parser::id(self) }
153}
154
155pub trait GetPrefilter {
157 fn prefilter(&self) -> Prefilter;
159}
160
161impl GetPrefilter for std::convert::Infallible {
162 #[inline]
163 fn prefilter(&self) -> Prefilter { match *self {} }
164}
165
166impl<T: Parser> GetPrefilter for T {
167 #[inline]
168 fn prefilter(&self) -> Prefilter { Parser::prefilter(self) }
169}
170
171#[derive(Debug, Default, Clone)]
174pub struct Prefilter {
175 pub account: Option<AccountPrefilter>,
177 pub transaction: Option<TransactionPrefilter>,
179 pub block_meta: Option<BlockMetaPrefilter>,
181 pub block: Option<BlockPrefilter>,
183 pub slot: Option<SlotPrefilter>,
185}
186
187fn merge_opt<T, F: FnOnce(&mut T, T)>(lhs: &mut Option<T>, rhs: Option<T>, f: F) {
188 match (lhs.as_mut(), rhs) {
189 (None, r) => *lhs = r,
190 (Some(_), None) => (),
191 (Some(l), Some(r)) => f(l, r),
192 }
193}
194
195impl Prefilter {
196 #[inline]
198 pub fn builder() -> PrefilterBuilder { PrefilterBuilder::default() }
199
200 pub fn merge(&mut self, other: Prefilter) {
203 let Self {
204 account,
205 transaction,
206 block_meta,
207 block,
208 slot,
209 } = self;
210 merge_opt(account, other.account, AccountPrefilter::merge);
211 merge_opt(transaction, other.transaction, TransactionPrefilter::merge);
212 merge_opt(block_meta, other.block_meta, BlockMetaPrefilter::merge);
213 merge_opt(block, other.block, BlockPrefilter::merge);
214 merge_opt(slot, other.slot, SlotPrefilter::merge);
215 }
216}
217
218impl FromIterator<Prefilter> for Prefilter {
219 fn from_iter<T: IntoIterator<Item = Prefilter>>(iter: T) -> Self {
220 let mut iter = iter.into_iter();
221 let Some(ret) = iter.next() else {
222 return Self::default();
223 };
224 iter.fold(ret, |mut l, r| {
225 l.merge(r);
226 l
227 })
228 }
229}
230
231#[derive(Debug, Default, Clone, PartialEq)]
233pub struct AccountPrefilter {
234 pub accounts: HashSet<Pubkey>,
236 pub owners: HashSet<Pubkey>,
238}
239
240impl AccountPrefilter {
241 pub fn merge(&mut self, other: AccountPrefilter) {
244 let Self { accounts, owners } = self;
245 accounts.extend(other.accounts);
246 owners.extend(other.owners);
247 }
248}
249
250#[derive(Debug, Clone, PartialEq)]
252pub struct TransactionPrefilter {
253 pub accounts_include: HashSet<Pubkey>,
256 pub accounts_required: HashSet<Pubkey>,
260 pub failed: Option<bool>,
265}
266
267impl Default for TransactionPrefilter {
268 fn default() -> Self {
269 Self {
270 accounts_include: HashSet::new(),
271 accounts_required: HashSet::new(),
272 failed: Some(false), }
274 }
275}
276
277impl TransactionPrefilter {
278 pub fn merge(&mut self, other: TransactionPrefilter) {
281 let Self {
282 accounts_include,
283 accounts_required,
284 failed,
285 } = self;
286
287 accounts_include.extend(other.accounts_include);
288 accounts_required.extend(other.accounts_required);
289
290 if other.failed.is_none() {
291 *failed = None;
292 }
293 }
294}
295
296#[derive(Debug, Default, Clone, PartialEq, Copy)]
298pub struct BlockMetaPrefilter {}
299
300impl BlockMetaPrefilter {
301 pub fn merge(_lhs: &mut Self, _rhs: Self) {}
304}
305
306#[derive(Debug, Default, Clone, PartialEq)]
308pub struct BlockPrefilter {
309 pub accounts_include: HashSet<Pubkey>,
311 pub include_transactions: bool,
313 pub include_accounts: bool,
315 pub include_entries: bool,
317}
318
319impl BlockPrefilter {
320 pub fn merge(&mut self, other: BlockPrefilter) {
322 let Self {
323 accounts_include,
324 include_transactions,
325 include_accounts,
326 include_entries,
327 } = self;
328
329 accounts_include.extend(other.accounts_include);
330 *include_accounts |= other.include_accounts;
331 *include_transactions |= other.include_transactions;
332 *include_entries |= other.include_entries;
333 }
334}
335
336#[derive(Debug, Clone, PartialEq, Copy)]
338pub struct SlotPrefilter {
339 pub filter_by_commitment: bool,
342}
343
344impl Default for SlotPrefilter {
345 fn default() -> Self {
346 Self {
347 filter_by_commitment: true,
348 }
349 }
350}
351
352impl SlotPrefilter {
353 pub fn merge(lhs: &mut Self, rhs: Self) {
358 lhs.filter_by_commitment = lhs.filter_by_commitment && rhs.filter_by_commitment;
359 }
360}
361
362#[macro_export]
378macro_rules! pubkey_convert_helpers {
379 ($ty:ty) => {
380 pub(crate) fn into_vixen_pubkey(value: $ty) -> $crate::Pubkey { value.to_bytes().into() }
381
382 pub(crate) fn from_vixen_pubkey(value: $crate::Pubkey) -> $ty { value.into_bytes().into() }
383 };
384}
385
386pub type Pubkey = KeyBytes<32>;
391
392#[cfg(feature = "proto")]
401#[derive(Clone, PartialEq, ::prost::Message)]
402pub struct PublicKeyProtoWrapper {
403 #[prost(bytes = "vec", tag = "1")]
405 pub value: Vec<u8>,
406}
407
408#[cfg(feature = "proto")]
409impl PublicKeyProtoWrapper {
410 pub fn new(value: impl Into<Vec<u8>>) -> Self {
412 Self {
413 value: value.into(),
414 }
415 }
416}
417
418#[derive(Clone, Copy, PartialEq, Eq, Hash)]
421#[repr(transparent)]
422pub struct KeyBytes<const LEN: usize>(pub [u8; LEN]);
423
424impl<const LEN: usize> Default for KeyBytes<LEN> {
425 fn default() -> Self { Self([0u8; LEN]) }
426}
427
428impl<const LEN: usize> Debug for KeyBytes<LEN> {
429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 f.debug_tuple("KeyBytes")
431 .field(&bs58::encode(self.0).into_string())
432 .finish()
433 }
434}
435
436impl<const LEN: usize> fmt::Display for KeyBytes<LEN> {
437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 f.write_str(&bs58::encode(self.0).into_string())
439 }
440}
441
442impl<const LEN: usize> From<[u8; LEN]> for KeyBytes<LEN> {
443 #[inline]
444 fn from(value: [u8; LEN]) -> Self { Self(value) }
445}
446
447impl<const LEN: usize> From<KeyBytes<LEN>> for [u8; LEN] {
448 #[inline]
449 fn from(value: KeyBytes<LEN>) -> Self { value.0 }
450}
451
452impl<const LEN: usize> std::ops::Deref for KeyBytes<LEN> {
453 type Target = [u8; LEN];
454
455 fn deref(&self) -> &Self::Target { &self.0 }
456}
457
458impl<const LEN: usize> std::ops::DerefMut for KeyBytes<LEN> {
459 fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
460}
461
462impl<const LEN: usize> AsRef<[u8; LEN]> for KeyBytes<LEN> {
463 fn as_ref(&self) -> &[u8; LEN] { self }
464}
465
466impl<const LEN: usize> AsMut<[u8; LEN]> for KeyBytes<LEN> {
467 fn as_mut(&mut self) -> &mut [u8; LEN] { self }
468}
469
470impl<const LEN: usize> std::borrow::Borrow<[u8; LEN]> for KeyBytes<LEN> {
471 fn borrow(&self) -> &[u8; LEN] { self }
472}
473
474impl<const LEN: usize> std::borrow::BorrowMut<[u8; LEN]> for KeyBytes<LEN> {
475 fn borrow_mut(&mut self) -> &mut [u8; LEN] { self }
476}
477
478impl<const LEN: usize> AsRef<[u8]> for KeyBytes<LEN> {
479 fn as_ref(&self) -> &[u8] { self.as_slice() }
480}
481
482impl<const LEN: usize> AsMut<[u8]> for KeyBytes<LEN> {
483 fn as_mut(&mut self) -> &mut [u8] { self.as_mut_slice() }
484}
485
486impl<const LEN: usize> std::borrow::Borrow<[u8]> for KeyBytes<LEN> {
487 fn borrow(&self) -> &[u8] { self.as_ref() }
488}
489
490impl<const LEN: usize> std::borrow::BorrowMut<[u8]> for KeyBytes<LEN> {
491 fn borrow_mut(&mut self) -> &mut [u8] { self.as_mut() }
492}
493
494type KeyFromSliceError = std::array::TryFromSliceError;
495
496impl<const LEN: usize> TryFrom<&[u8]> for KeyBytes<LEN> {
497 type Error = KeyFromSliceError;
498
499 #[inline]
500 fn try_from(value: &[u8]) -> Result<Self, Self::Error> { value.try_into().map(Self) }
501}
502
503impl<const LEN: usize> KeyBytes<LEN> {
504 #[must_use]
506 pub fn new(bytes: [u8; LEN]) -> Self { bytes.into() }
507
508 #[must_use]
510 pub fn into_bytes(self) -> [u8; LEN] { self.into() }
511
512 pub fn try_from_ref<T: AsRef<[u8]>>(key: T) -> Result<Self, KeyFromSliceError> {
518 key.as_ref().try_into()
519 }
520
521 pub fn equals_ref<T: AsRef<[u8]>>(&self, other: T) -> bool {
524 self.as_slice().eq(other.as_ref())
525 }
526}
527
528impl<const LEN: usize> BorshSerialize for KeyBytes<LEN> {
529 fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
530 self.0.serialize(writer)
531 }
532}
533
534impl<const LEN: usize> BorshDeserialize for KeyBytes<LEN> {
535 fn deserialize_reader<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
536 let bytes = <[u8; LEN]>::deserialize_reader(reader)?;
537 Ok(Self(bytes))
538 }
539}
540
541#[cfg(feature = "proto")]
546impl ::prost::Message for KeyBytes<32> {
547 fn encode_raw(&self, buf: &mut impl ::prost::bytes::BufMut) {
548 PublicKeyProtoWrapper::new(self.0).encode_raw(buf);
549 }
550
551 fn merge_field(
552 &mut self,
553 tag: u32,
554 wire_type: ::prost::encoding::WireType,
555 buf: &mut impl ::prost::bytes::Buf,
556 ctx: ::prost::encoding::DecodeContext,
557 ) -> ::core::result::Result<(), ::prost::DecodeError> {
558 let mut wrapper = PublicKeyProtoWrapper::new(self.0);
559
560 wrapper.merge_field(tag, wire_type, buf, ctx)?;
561
562 let bytes = &wrapper.value;
563
564 if bytes.len() != 32 {
565 #[allow(deprecated)]
568 return Err(::prost::DecodeError::new(
569 "expected exactly 32 bytes for Pubkey",
570 ));
571 }
572
573 self.0.copy_from_slice(bytes);
574
575 Ok(())
576 }
577
578 fn encoded_len(&self) -> usize { PublicKeyProtoWrapper::new(self.0).encoded_len() }
579
580 fn clear(&mut self) { self.0 = [0u8; 32]; }
581}
582
583#[derive(Debug, Clone, Copy, thiserror::Error)]
585pub enum KeyFromStrError<const LEN: usize = 32> {
586 #[error("Invalid base58 string")]
588 Bs58(#[from] bs58::decode::Error),
589 #[error("Invalid key length, must be {LEN} bytes")]
591 Len(#[from] std::array::TryFromSliceError),
592}
593
594impl<const LEN: usize> FromStr for KeyBytes<LEN> {
595 type Err = KeyFromStrError<LEN>;
596
597 fn from_str(s: &str) -> Result<Self, Self::Err> {
598 bs58::decode(s)
599 .into_vec()?
600 .as_slice()
601 .try_into()
602 .map_err(Into::into)
603 }
604}
605
606impl<const LEN: usize> TryFrom<&str> for KeyBytes<LEN> {
607 type Error = KeyFromStrError<LEN>;
608
609 fn try_from(value: &str) -> Result<Self, Self::Error> { value.parse() }
610}
611
612impl<const LEN: usize> TryFrom<String> for KeyBytes<LEN> {
613 type Error = KeyFromStrError<LEN>;
614
615 fn try_from(value: String) -> Result<Self, Self::Error> { value.parse() }
616}
617
618impl<const LEN: usize> TryFrom<Cow<'_, str>> for KeyBytes<LEN> {
619 type Error = KeyFromStrError<LEN>;
620
621 fn try_from(value: Cow<str>) -> Result<Self, Self::Error> { value.parse() }
622}
623
624#[derive(Debug, Clone, thiserror::Error)]
626pub enum PrefilterError {
627 #[error("Value already given for field {0}")]
629 AlreadySet(&'static str),
630 #[error("Invalid pubkey {}", bs58::encode(.0).into_string())]
632 BadPubkey(Vec<u8>, std::array::TryFromSliceError),
633}
634
635#[derive(Debug, Default)]
637#[must_use = "Consider calling .build() on this builder"]
638#[allow(clippy::struct_excessive_bools)]
639pub struct PrefilterBuilder {
640 error: Option<PrefilterError>,
641 slots: bool,
642 block_metas: bool,
643 block_accounts_include: Option<HashSet<Pubkey>>,
645 block_include_accounts: bool,
647 block_include_transactions: bool,
649 block_include_entries: bool,
651 accounts_include_all: bool,
653 accounts: Option<HashSet<Pubkey>>,
655 account_owners: Option<HashSet<Pubkey>>,
657 transaction_accounts_include: Option<HashSet<Pubkey>>,
659 transaction_accounts_required: Option<HashSet<Pubkey>>,
661}
662
663fn set_opt<T>(opt: &mut Option<T>, field: &'static str, val: T) -> Result<(), PrefilterError> {
664 if opt.is_some() {
665 return Err(PrefilterError::AlreadySet(field));
666 }
667
668 *opt = Some(val);
669 Ok(())
670}
671
672fn collect_pubkeys<I: IntoIterator>(it: I) -> Result<HashSet<Pubkey>, PrefilterError>
674where I::Item: AsRef<[u8]> {
675 it.into_iter()
676 .map(|p| {
677 let p = p.as_ref();
678 p.try_into()
679 .map_err(|e| PrefilterError::BadPubkey(p.to_vec(), e))
680 })
681 .collect()
682}
683
684impl PrefilterBuilder {
685 pub fn build(self) -> Result<Prefilter, PrefilterError> {
690 let PrefilterBuilder {
691 error,
692 accounts_include_all,
693 accounts,
694 account_owners,
695 slots,
696 block_metas,
697 block_accounts_include,
698 block_include_accounts,
699 block_include_entries,
700 block_include_transactions,
701 transaction_accounts_include,
702 transaction_accounts_required,
703 } = self;
704 if let Some(err) = error {
705 return Err(err);
706 }
707
708 let account = AccountPrefilter {
709 accounts: accounts.unwrap_or_default(),
710 owners: account_owners.unwrap_or_default(),
711 };
712
713 let transaction = TransactionPrefilter {
714 accounts_include: transaction_accounts_include.unwrap_or_default(),
715 accounts_required: transaction_accounts_required.unwrap_or_default(),
716 ..Default::default()
717 };
718
719 let block_meta = BlockMetaPrefilter {};
720
721 let block = BlockPrefilter {
722 accounts_include: block_accounts_include.unwrap_or_default(),
723 include_accounts: block_include_accounts,
724 include_transactions: block_include_transactions,
725 include_entries: block_include_entries,
726 };
727
728 let slot = SlotPrefilter::default();
729
730 let account = if accounts_include_all {
731 Some(AccountPrefilter::default())
732 } else {
733 (account != AccountPrefilter::default()).then_some(account)
734 };
735
736 Ok(Prefilter {
737 account,
738 transaction: (transaction != TransactionPrefilter::default()).then_some(transaction),
739 block_meta: block_metas.then_some(block_meta),
740 block: (block != BlockPrefilter::default()).then_some(block),
741 slot: slots.then_some(slot),
742 })
743 }
744
745 fn mutate<F: FnOnce(&mut Self) -> Result<(), PrefilterError>>(mut self, f: F) -> Self {
746 if self.error.is_none() {
747 self.error = f(&mut self).err();
748 }
749
750 self
751 }
752
753 pub fn slots(self) -> Self {
755 self.mutate(|this| {
756 this.slots = true;
757 Ok(())
758 })
759 }
760
761 pub fn block_metas(self) -> Self {
763 self.mutate(|this| {
764 this.block_metas = true;
765 Ok(())
766 })
767 }
768
769 pub fn accounts_include_all(self) -> Self {
771 self.mutate(|this| {
772 this.accounts_include_all = true;
773 Ok(())
774 })
775 }
776
777 pub fn accounts<I: IntoIterator>(self, it: I) -> Self
779 where I::Item: AsRef<[u8]> {
780 self.mutate(|this| set_opt(&mut this.accounts, "accounts", collect_pubkeys(it)?))
781 }
782
783 pub fn account_owners<I: IntoIterator>(self, it: I) -> Self
785 where I::Item: AsRef<[u8]> {
786 self.mutate(|this| {
787 set_opt(
788 &mut this.account_owners,
789 "account_owners",
790 collect_pubkeys(it)?,
791 )
792 })
793 }
794
795 pub fn transaction_accounts<I: IntoIterator>(self, it: I) -> Self
801 where I::Item: AsRef<[u8]> {
802 self.mutate(|this| {
803 set_opt(
804 &mut this.transaction_accounts_required,
805 "transaction_accounts_required",
806 collect_pubkeys(it)?,
807 )
808 })
809 }
810
811 pub fn transaction_accounts_include<I: IntoIterator>(self, it: I) -> Self
816 where I::Item: AsRef<[u8]> {
817 self.mutate(|this| {
818 set_opt(
819 &mut this.transaction_accounts_include,
820 "transaction_accounts_include",
821 collect_pubkeys(it)?,
822 )
823 })
824 }
825
826 pub fn block_accounts_include<I: IntoIterator>(self, it: I) -> Self
828 where I::Item: AsRef<[u8]> {
829 self.mutate(|this| {
830 set_opt(
831 &mut this.block_accounts_include,
832 "block_accounts_include",
833 collect_pubkeys(it)?,
834 )
835 })
836 }
837
838 pub fn block_include_accounts(self) -> Self {
840 self.mutate(|this| {
841 this.block_include_accounts = true;
842 Ok(())
843 })
844 }
845
846 pub fn block_include_transactions(self) -> Self {
848 self.mutate(|this| {
849 this.block_include_transactions = true;
850 Ok(())
851 })
852 }
853
854 pub fn block_include_entries(self) -> Self {
856 self.mutate(|this| {
857 this.block_include_entries = true;
858 Ok(())
859 })
860 }
861}
862
863#[derive(Debug, Clone)]
865pub struct Filters {
866 pub parsers_filters: HashMap<String, Prefilter>,
868}
869
870impl Filters {
871 #[inline]
873 #[must_use]
874 pub fn new(filters: HashMap<String, Prefilter>) -> Self {
875 Self {
876 parsers_filters: filters,
877 }
878 }
879}
880
881#[derive(Debug, Clone, Copy, Deserialize, clap::ValueEnum)]
884#[serde(rename_all = "lowercase")]
885pub enum CommitmentLevel {
886 Processed,
888 Confirmed,
890 Finalized,
892}
893
894impl From<geyser::CommitmentLevel> for CommitmentLevel {
895 fn from(value: geyser::CommitmentLevel) -> Self {
896 match value {
897 geyser::CommitmentLevel::Processed => Self::Processed,
898 geyser::CommitmentLevel::Confirmed => Self::Confirmed,
899 geyser::CommitmentLevel::Finalized => Self::Finalized,
900 }
901 }
902}
903
904impl From<Filters> for SubscribeRequest {
905 fn from(value: Filters) -> Self {
906 SubscribeRequest {
907 accounts: value
908 .parsers_filters
909 .iter()
910 .filter_map(|(k, v)| {
911 let v = v.account.as_ref()?;
912
913 Some((k.clone(), SubscribeRequestFilterAccounts {
914 account: v.accounts.iter().map(ToString::to_string).collect(),
915 owner: v.owners.iter().map(ToString::to_string).collect(),
916 filters: vec![],
918 nonempty_txn_signature: None,
920 }))
921 })
922 .collect(),
923 slots: value
924 .parsers_filters
925 .iter()
926 .filter_map(|(k, v)| {
927 let slot_filter = v.slot.as_ref()?;
928 Some((k.clone(), SubscribeRequestFilterSlots {
929 filter_by_commitment: Some(slot_filter.filter_by_commitment),
930 interslot_updates: None,
931 }))
932 })
933 .collect(),
934 transactions: value
935 .parsers_filters
936 .iter()
937 .filter_map(|(k, v)| {
938 let v = v.transaction.as_ref()?;
939
940 Some((k.clone(), SubscribeRequestFilterTransactions {
941 vote: None,
942 failed: v.failed,
943 signature: None,
944 account_include: v
945 .accounts_include
946 .iter()
947 .map(ToString::to_string)
948 .collect(),
949 account_exclude: [].into_iter().collect(),
950 account_required: v
951 .accounts_required
952 .iter()
953 .map(ToString::to_string)
954 .collect(),
955 }))
956 })
957 .collect(),
958 transactions_status: [].into_iter().collect(),
959 blocks: value
960 .parsers_filters
961 .iter()
962 .filter_map(|(k, v)| {
963 let v = v.block.as_ref()?;
964
965 Some((k.clone(), SubscribeRequestFilterBlocks {
966 account_include: v
967 .accounts_include
968 .iter()
969 .map(ToString::to_string)
970 .collect(),
971 include_transactions: Some(v.include_transactions),
972 include_accounts: Some(v.include_accounts),
973 include_entries: Some(v.include_entries),
974 }))
975 })
976 .collect(),
977 blocks_meta: value
978 .parsers_filters
979 .iter()
980 .filter_map(|(k, v)| {
981 v.block_meta?;
982 Some((k.clone(), SubscribeRequestFilterBlocksMeta {}))
983 })
984 .collect(),
985 entry: [].into_iter().collect(),
986 commitment: None,
987 accounts_data_slice: vec![],
988 ping: None,
989 from_slot: None,
990 }
991 }
992}
993
994#[cfg(test)]
995mod tests {
996 use super::*;
997 fn block_prefilter(
998 include_accounts: bool,
999 include_transactions: bool,
1000 include_entries: bool,
1001 ) -> BlockPrefilter {
1002 BlockPrefilter {
1003 accounts_include: HashSet::new(),
1004 include_accounts,
1005 include_transactions,
1006 include_entries,
1007 }
1008 }
1009
1010 #[test]
1011 fn test_block_prefilter_merge_basic_union() {
1012 let mut a = block_prefilter(true, false, false);
1013 let b = block_prefilter(false, true, false);
1014
1015 a.merge(b);
1016
1017 assert!(
1018 a.include_accounts,
1019 "BUG: include_accounts was true, should remain true after merge with false"
1020 );
1021 assert!(
1022 a.include_transactions,
1023 "include_transactions should be true after merge"
1024 );
1025 assert!(
1026 !a.include_entries,
1027 "include_entries should remain false (neither requested)"
1028 );
1029 }
1030
1031 #[test]
1032 fn test_block_prefilter_merge_idempotence() {
1033 let original = block_prefilter(true, false, true);
1034 let mut a = original.clone();
1035 let b = original.clone();
1036
1037 a.merge(b);
1038
1039 assert_eq!(a, original, "merge(A, A) should equal A (idempotence)");
1040 }
1041
1042 #[test]
1043 fn test_block_prefilter_merge_commutativity() {
1044 let a_orig = block_prefilter(true, false, true);
1045 let b_orig = block_prefilter(false, true, false);
1046
1047 let mut a = a_orig.clone();
1048 a.merge(b_orig.clone());
1049
1050 let mut b = b_orig.clone();
1051 b.merge(a_orig.clone());
1052
1053 assert_eq!(a, b, "merge(A, B) should equal merge(B, A) (commutativity)");
1054 }
1055
1056 #[test]
1057 fn test_block_prefilter_merge_associativity() {
1058 let a_orig = block_prefilter(true, false, false);
1059 let b_orig = block_prefilter(false, true, false);
1060 let c_orig = block_prefilter(false, false, true);
1061
1062 let mut ab = a_orig.clone();
1063 ab.merge(b_orig.clone());
1064 let mut abc_left = ab;
1065 abc_left.merge(c_orig.clone());
1066
1067 let mut bc = b_orig.clone();
1068 bc.merge(c_orig.clone());
1069 let mut abc_right = a_orig.clone();
1070 abc_right.merge(bc);
1071
1072 assert_eq!(
1073 abc_left, abc_right,
1074 "merge(merge(A, B), C) should equal merge(A, merge(B, C)) (associativity)"
1075 );
1076 }
1077
1078 #[test]
1079 fn test_block_prefilter_merge_identity() {
1080 let original = block_prefilter(true, true, false);
1081 let mut a = original.clone();
1082 let default = BlockPrefilter::default();
1083
1084 a.merge(default);
1085
1086 assert_eq!(
1087 a, original,
1088 "merge(A, default) should equal A (identity element)"
1089 );
1090 }
1091
1092 #[test]
1093 fn test_block_prefilter_merge_monotonicity() {
1094 let mut a = block_prefilter(true, true, true);
1095 let b = block_prefilter(false, false, false);
1096
1097 a.merge(b);
1098
1099 assert!(
1100 a.include_accounts,
1101 "BUG: include_accounts was true, must remain true after merge"
1102 );
1103 assert!(
1104 a.include_transactions,
1105 "BUG: include_transactions was true, must remain true after merge"
1106 );
1107 assert!(
1108 a.include_entries,
1109 "BUG: include_entries was true, must remain true after merge"
1110 );
1111 }
1112
1113 #[test]
1114 fn test_block_prefilter_merge_truth_table() {
1115 for lhs in [false, true] {
1116 for rhs in [false, true] {
1117 let expected = lhs || rhs;
1118
1119 {
1120 let mut a = block_prefilter(lhs, false, false);
1121 let b = block_prefilter(rhs, false, false);
1122 a.merge(b);
1123 assert_eq!(
1124 a.include_accounts, expected,
1125 "include_accounts: {lhs} OR {rhs} should be {expected}"
1126 );
1127 }
1128
1129 {
1130 let mut a = block_prefilter(false, lhs, false);
1131 let b = block_prefilter(false, rhs, false);
1132 a.merge(b);
1133 assert_eq!(
1134 a.include_transactions, expected,
1135 "include_transactions: {lhs} OR {rhs} should be {expected}"
1136 );
1137 }
1138
1139 {
1140 let mut a = block_prefilter(false, false, lhs);
1141 let b = block_prefilter(false, false, rhs);
1142 a.merge(b);
1143 assert_eq!(
1144 a.include_entries, expected,
1145 "include_entries: {lhs} OR {rhs} should be {expected}"
1146 );
1147 }
1148 }
1149 }
1150 }
1151
1152 #[test]
1153 fn test_block_prefilter_merge_hashset_union() {
1154 let key1: Pubkey = [1u8; 32].into();
1155 let key2: Pubkey = [2u8; 32].into();
1156
1157 let mut a = BlockPrefilter {
1158 accounts_include: [key1].into_iter().collect(),
1159 include_accounts: false,
1160 include_transactions: false,
1161 include_entries: false,
1162 };
1163
1164 let b = BlockPrefilter {
1165 accounts_include: [key2].into_iter().collect(),
1166 include_accounts: false,
1167 include_transactions: false,
1168 include_entries: false,
1169 };
1170
1171 a.merge(b);
1172
1173 assert!(
1174 a.accounts_include.contains(&key1),
1175 "key1 should be in merged set"
1176 );
1177 assert!(
1178 a.accounts_include.contains(&key2),
1179 "key2 should be in merged set"
1180 );
1181 assert_eq!(a.accounts_include.len(), 2, "merged set should have 2 keys");
1182 }
1183
1184 #[test]
1185 fn test_prefilter_merge_block_or_semantics() {
1186 let mut p1 = Prefilter {
1187 block: Some(block_prefilter(true, false, false)),
1188 ..Default::default()
1189 };
1190
1191 let p2 = Prefilter {
1192 block: Some(block_prefilter(false, true, false)),
1193 ..Default::default()
1194 };
1195
1196 p1.merge(p2);
1197
1198 let block = p1.block.expect("block prefilter should exist after merge");
1199 assert!(
1200 block.include_accounts,
1201 "Prefilter merge: include_accounts should be true"
1202 );
1203 assert!(
1204 block.include_transactions,
1205 "Prefilter merge: include_transactions should be true"
1206 );
1207 }
1208
1209 #[test]
1210 fn test_prefilter_from_iterator_block_or_semantics() {
1211 let p1 = Prefilter {
1212 block: Some(block_prefilter(true, false, false)),
1213 ..Default::default()
1214 };
1215 let p2 = Prefilter {
1216 block: Some(block_prefilter(false, true, false)),
1217 ..Default::default()
1218 };
1219 let p3 = Prefilter {
1220 block: Some(block_prefilter(false, false, true)),
1221 ..Default::default()
1222 };
1223
1224 let combined: Prefilter = [p1, p2, p3].into_iter().collect();
1225
1226 let block = combined
1227 .block
1228 .expect("block prefilter should exist after collect");
1229 assert!(
1230 block.include_accounts,
1231 "FromIterator: include_accounts should be true"
1232 );
1233 assert!(
1234 block.include_transactions,
1235 "FromIterator: include_transactions should be true"
1236 );
1237 assert!(
1238 block.include_entries,
1239 "FromIterator: include_entries should be true"
1240 );
1241 }
1242}