1#![deny(
2 clippy::disallowed_methods,
3 clippy::suspicious,
4 clippy::style,
5 clippy::clone_on_ref_ptr,
6 missing_debug_implementations,
7 missing_copy_implementations
8)]
9#![warn(clippy::pedantic, missing_docs)]
10#![allow(clippy::module_name_repetitions)]
11
12use std::{
19 borrow::Cow,
20 collections::{HashMap, HashSet},
21 fmt::{self, Debug},
22 future::Future,
23 str::FromStr,
24 sync::Arc,
25};
26
27use serde::Deserialize;
28use yellowstone_grpc_proto::geyser::{
29 self, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
30 SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
31 SubscribeRequestFilterTransactions, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
32 SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdateSlot,
33 SubscribeUpdateTransaction,
34};
35
36pub extern crate bs58;
37#[cfg(feature = "proto")]
38pub extern crate yellowstone_vixen_proto;
39
40pub mod instruction;
41#[cfg(feature = "proto")]
42pub mod proto;
43
44type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
45
46#[derive(Debug)]
48pub enum ParseError {
49 Filtered,
53 Other(BoxedError),
55}
56
57impl<T: Into<BoxedError>> From<T> for ParseError {
58 #[inline]
59 fn from(value: T) -> Self { Self::Other(value.into()) }
60}
61
62pub type ParseResult<T> = Result<T, ParseError>;
64
65pub type AccountUpdate = SubscribeUpdateAccount;
67pub type AccountUpdateInfo = SubscribeUpdateAccountInfo;
69pub type TransactionUpdate = SubscribeUpdateTransaction;
71pub type BlockMetaUpdate = SubscribeUpdateBlockMeta;
73pub type BlockUpdate = SubscribeUpdateBlock;
75pub type SlotUpdate = SubscribeUpdateSlot;
77
78#[derive(Debug)]
84pub struct InstructionUpdateOutput<T> {
85 pub parsed_ix: T,
87 pub shared_data: Arc<instruction::InstructionShared>,
89}
90
91pub trait Parser {
95 type Input;
97 type Output;
99
100 fn id(&self) -> Cow<'static, str>;
110
111 fn prefilter(&self) -> Prefilter;
114
115 fn parse(&self, value: &Self::Input) -> impl Future<Output = ParseResult<Self::Output>> + Send;
117}
118
119pub trait ProgramParser: Parser {
121 fn program_id(&self) -> Pubkey;
123}
124
125pub trait ParserId {
127 fn id(&self) -> Cow<'static, str>;
129}
130
131impl ParserId for std::convert::Infallible {
132 #[inline]
133 fn id(&self) -> Cow<'static, str> { match *self {} }
134}
135
136impl<T: Parser> ParserId for T {
137 #[inline]
138 fn id(&self) -> Cow<'static, str> { Parser::id(self) }
139}
140
141pub trait GetPrefilter {
143 fn prefilter(&self) -> Prefilter;
145}
146
147impl GetPrefilter for std::convert::Infallible {
148 #[inline]
149 fn prefilter(&self) -> Prefilter { match *self {} }
150}
151
152impl<T: Parser> GetPrefilter for T {
153 #[inline]
154 fn prefilter(&self) -> Prefilter { Parser::prefilter(self) }
155}
156
157#[derive(Debug, Default, Clone)]
160pub struct Prefilter {
161 pub account: Option<AccountPrefilter>,
163 pub transaction: Option<TransactionPrefilter>,
165 pub block_meta: Option<BlockMetaPrefilter>,
167 pub block: Option<BlockPrefilter>,
169 pub slot: Option<SlotPrefilter>,
171}
172
173fn merge_opt<T, F: FnOnce(&mut T, T)>(lhs: &mut Option<T>, rhs: Option<T>, f: F) {
174 match (lhs.as_mut(), rhs) {
175 (None, r) => *lhs = r,
176 (Some(_), None) => (),
177 (Some(l), Some(r)) => f(l, r),
178 }
179}
180
181impl Prefilter {
182 #[inline]
184 pub fn builder() -> PrefilterBuilder { PrefilterBuilder::default() }
185
186 pub fn merge(&mut self, other: Prefilter) {
189 let Self {
190 account,
191 transaction,
192 block_meta,
193 block,
194 slot,
195 } = self;
196 merge_opt(account, other.account, AccountPrefilter::merge);
197 merge_opt(transaction, other.transaction, TransactionPrefilter::merge);
198 merge_opt(block_meta, other.block_meta, BlockMetaPrefilter::merge);
199 merge_opt(block, other.block, BlockPrefilter::merge);
200 merge_opt(slot, other.slot, SlotPrefilter::merge);
201 }
202}
203
204impl FromIterator<Prefilter> for Prefilter {
205 fn from_iter<T: IntoIterator<Item = Prefilter>>(iter: T) -> Self {
206 let mut iter = iter.into_iter();
207 let Some(ret) = iter.next() else {
208 return Self::default();
209 };
210 iter.fold(ret, |mut l, r| {
211 l.merge(r);
212 l
213 })
214 }
215}
216
217#[derive(Debug, Default, Clone, PartialEq)]
219pub struct AccountPrefilter {
220 pub accounts: HashSet<Pubkey>,
222 pub owners: HashSet<Pubkey>,
224}
225
226impl AccountPrefilter {
227 pub fn merge(&mut self, other: AccountPrefilter) {
230 let Self { accounts, owners } = self;
231 accounts.extend(other.accounts);
232 owners.extend(other.owners);
233 }
234}
235
236#[derive(Debug, Default, Clone, PartialEq)]
238pub struct TransactionPrefilter {
239 pub accounts_include: HashSet<Pubkey>,
242 pub accounts_required: HashSet<Pubkey>,
246}
247
248impl TransactionPrefilter {
249 pub fn merge(&mut self, other: TransactionPrefilter) {
252 let Self {
253 accounts_include,
254 accounts_required,
255 } = self;
256
257 accounts_include.extend(other.accounts_include);
258 accounts_required.extend(other.accounts_required);
259 }
260}
261
262#[derive(Debug, Default, Clone, PartialEq, Copy)]
264pub struct BlockMetaPrefilter {}
265
266impl BlockMetaPrefilter {
267 pub fn merge(_lhs: &mut Self, _rhs: Self) {}
270}
271
272#[derive(Debug, Default, Clone, PartialEq)]
274pub struct BlockPrefilter {
275 pub accounts_include: HashSet<Pubkey>,
277 pub include_transactions: bool,
279 pub include_accounts: bool,
281 pub include_entries: bool,
283}
284
285impl BlockPrefilter {
286 pub fn merge(&mut self, other: BlockPrefilter) {
288 let Self {
289 accounts_include,
290 include_transactions,
291 include_accounts,
292 include_entries,
293 } = self;
294
295 accounts_include.extend(other.accounts_include);
296 *include_accounts = other.include_accounts;
297 *include_transactions = other.include_transactions;
298 *include_entries = other.include_entries;
299 }
300}
301
302#[derive(Debug, Default, Clone, PartialEq, Copy)]
304pub struct SlotPrefilter {}
305
306impl SlotPrefilter {
307 pub fn merge(_lhs: &mut Self, _rhs: Self) {}
310}
311
312#[macro_export]
328macro_rules! pubkey_convert_helpers {
329 ($ty:ty) => {
330 pub(crate) fn into_vixen_pubkey(value: $ty) -> $crate::Pubkey { value.to_bytes().into() }
331
332 pub(crate) fn from_vixen_pubkey(value: $crate::Pubkey) -> $ty { value.into_bytes().into() }
333 };
334}
335
336pub type Pubkey = KeyBytes<32>;
344
345#[derive(Clone, Copy, PartialEq, Eq, Hash)]
348#[repr(transparent)]
349pub struct KeyBytes<const LEN: usize>(pub [u8; LEN]);
350
351impl<const LEN: usize> Debug for KeyBytes<LEN> {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 f.debug_tuple("KeyBytes")
354 .field(&bs58::encode(self.0).into_string())
355 .finish()
356 }
357}
358
359impl<const LEN: usize> fmt::Display for KeyBytes<LEN> {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 f.write_str(&bs58::encode(self.0).into_string())
362 }
363}
364
365impl<const LEN: usize> From<[u8; LEN]> for KeyBytes<LEN> {
366 #[inline]
367 fn from(value: [u8; LEN]) -> Self { Self(value) }
368}
369
370impl<const LEN: usize> From<KeyBytes<LEN>> for [u8; LEN] {
371 #[inline]
372 fn from(value: KeyBytes<LEN>) -> Self { value.0 }
373}
374
375impl<const LEN: usize> std::ops::Deref for KeyBytes<LEN> {
376 type Target = [u8; LEN];
377
378 fn deref(&self) -> &Self::Target { &self.0 }
379}
380
381impl<const LEN: usize> std::ops::DerefMut for KeyBytes<LEN> {
382 fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
383}
384
385impl<const LEN: usize> AsRef<[u8; LEN]> for KeyBytes<LEN> {
386 fn as_ref(&self) -> &[u8; LEN] { self }
387}
388
389impl<const LEN: usize> AsMut<[u8; LEN]> for KeyBytes<LEN> {
390 fn as_mut(&mut self) -> &mut [u8; LEN] { self }
391}
392
393impl<const LEN: usize> std::borrow::Borrow<[u8; LEN]> for KeyBytes<LEN> {
394 fn borrow(&self) -> &[u8; LEN] { self }
395}
396
397impl<const LEN: usize> std::borrow::BorrowMut<[u8; LEN]> for KeyBytes<LEN> {
398 fn borrow_mut(&mut self) -> &mut [u8; LEN] { self }
399}
400
401impl<const LEN: usize> AsRef<[u8]> for KeyBytes<LEN> {
402 fn as_ref(&self) -> &[u8] { self.as_slice() }
403}
404
405impl<const LEN: usize> AsMut<[u8]> for KeyBytes<LEN> {
406 fn as_mut(&mut self) -> &mut [u8] { self.as_mut_slice() }
407}
408
409impl<const LEN: usize> std::borrow::Borrow<[u8]> for KeyBytes<LEN> {
410 fn borrow(&self) -> &[u8] { self.as_ref() }
411}
412
413impl<const LEN: usize> std::borrow::BorrowMut<[u8]> for KeyBytes<LEN> {
414 fn borrow_mut(&mut self) -> &mut [u8] { self.as_mut() }
415}
416
417type KeyFromSliceError = std::array::TryFromSliceError;
418
419impl<const LEN: usize> TryFrom<&[u8]> for KeyBytes<LEN> {
420 type Error = KeyFromSliceError;
421
422 #[inline]
423 fn try_from(value: &[u8]) -> Result<Self, Self::Error> { value.try_into().map(Self) }
424}
425
426impl<const LEN: usize> KeyBytes<LEN> {
427 #[must_use]
429 pub fn new(bytes: [u8; LEN]) -> Self { bytes.into() }
430
431 #[must_use]
433 pub fn into_bytes(self) -> [u8; LEN] { self.into() }
434
435 pub fn try_from_ref<T: AsRef<[u8]>>(key: T) -> Result<Self, KeyFromSliceError> {
441 key.as_ref().try_into()
442 }
443
444 pub fn equals_ref<T: AsRef<[u8]>>(&self, other: T) -> bool {
447 self.as_slice().eq(other.as_ref())
448 }
449}
450
451#[derive(Debug, Clone, Copy, thiserror::Error)]
453pub enum KeyFromStrError<const LEN: usize = 32> {
454 #[error("Invalid base58 string")]
456 Bs58(#[from] bs58::decode::Error),
457 #[error("Invalid key length, must be {LEN} bytes")]
459 Len(#[from] std::array::TryFromSliceError),
460}
461
462impl<const LEN: usize> FromStr for KeyBytes<LEN> {
463 type Err = KeyFromStrError<LEN>;
464
465 fn from_str(s: &str) -> Result<Self, Self::Err> {
466 bs58::decode(s)
467 .into_vec()?
468 .as_slice()
469 .try_into()
470 .map_err(Into::into)
471 }
472}
473
474impl<const LEN: usize> TryFrom<&str> for KeyBytes<LEN> {
475 type Error = KeyFromStrError<LEN>;
476
477 fn try_from(value: &str) -> Result<Self, Self::Error> { value.parse() }
478}
479
480impl<const LEN: usize> TryFrom<String> for KeyBytes<LEN> {
481 type Error = KeyFromStrError<LEN>;
482
483 fn try_from(value: String) -> Result<Self, Self::Error> { value.parse() }
484}
485
486impl<const LEN: usize> TryFrom<Cow<'_, str>> for KeyBytes<LEN> {
487 type Error = KeyFromStrError<LEN>;
488
489 fn try_from(value: Cow<str>) -> Result<Self, Self::Error> { value.parse() }
490}
491
492#[derive(Debug, Clone, thiserror::Error)]
494pub enum PrefilterError {
495 #[error("Value already given for field {0}")]
497 AlreadySet(&'static str),
498 #[error("Invalid pubkey {}", bs58::encode(.0).into_string())]
500 BadPubkey(Vec<u8>, std::array::TryFromSliceError),
501}
502
503#[derive(Debug, Default)]
505#[must_use = "Consider calling .build() on this builder"]
506#[allow(clippy::struct_excessive_bools)]
507pub struct PrefilterBuilder {
508 error: Option<PrefilterError>,
509 slots: bool,
510 block_metas: bool,
511 block_accounts_include: Option<HashSet<Pubkey>>,
513 block_include_accounts: bool,
515 block_include_transactions: bool,
517 block_include_entries: bool,
519 accounts_include_all: bool,
521 accounts: Option<HashSet<Pubkey>>,
523 account_owners: Option<HashSet<Pubkey>>,
525 transaction_accounts_include: Option<HashSet<Pubkey>>,
527 transaction_accounts_required: Option<HashSet<Pubkey>>,
529}
530
531fn set_opt<T>(opt: &mut Option<T>, field: &'static str, val: T) -> Result<(), PrefilterError> {
532 if opt.is_some() {
533 return Err(PrefilterError::AlreadySet(field));
534 }
535
536 *opt = Some(val);
537 Ok(())
538}
539
540fn collect_pubkeys<I: IntoIterator>(it: I) -> Result<HashSet<Pubkey>, PrefilterError>
542where I::Item: AsRef<[u8]> {
543 it.into_iter()
544 .map(|p| {
545 let p = p.as_ref();
546 p.try_into()
547 .map_err(|e| PrefilterError::BadPubkey(p.to_vec(), e))
548 })
549 .collect()
550}
551
552impl PrefilterBuilder {
553 pub fn build(self) -> Result<Prefilter, PrefilterError> {
558 let PrefilterBuilder {
559 error,
560 accounts_include_all,
561 accounts,
562 account_owners,
563 slots,
564 block_metas,
565 block_accounts_include,
566 block_include_accounts,
567 block_include_entries,
568 block_include_transactions,
569 transaction_accounts_include,
570 transaction_accounts_required,
571 } = self;
572 if let Some(err) = error {
573 return Err(err);
574 }
575
576 let account = AccountPrefilter {
577 accounts: accounts.unwrap_or_default(),
578 owners: account_owners.unwrap_or_default(),
579 };
580
581 let transaction = TransactionPrefilter {
582 accounts_include: transaction_accounts_include.unwrap_or_default(),
583 accounts_required: transaction_accounts_required.unwrap_or_default(),
584 };
585
586 let block_meta = BlockMetaPrefilter {};
587
588 let block = BlockPrefilter {
589 accounts_include: block_accounts_include.unwrap_or_default(),
590 include_accounts: block_include_accounts,
591 include_transactions: block_include_transactions,
592 include_entries: block_include_entries,
593 };
594
595 let slot = SlotPrefilter {};
596
597 let account = if accounts_include_all {
598 Some(AccountPrefilter::default())
599 } else {
600 (account != AccountPrefilter::default()).then_some(account)
601 };
602
603 Ok(Prefilter {
604 account,
605 transaction: (transaction != TransactionPrefilter::default()).then_some(transaction),
606 block_meta: block_metas.then_some(block_meta),
607 block: (block != BlockPrefilter::default()).then_some(block),
608 slot: slots.then_some(slot),
609 })
610 }
611
612 fn mutate<F: FnOnce(&mut Self) -> Result<(), PrefilterError>>(mut self, f: F) -> Self {
613 if self.error.is_none() {
614 self.error = f(&mut self).err();
615 }
616
617 self
618 }
619
620 pub fn slots(self) -> Self {
622 self.mutate(|this| {
623 this.slots = true;
624 Ok(())
625 })
626 }
627
628 pub fn block_metas(self) -> Self {
630 self.mutate(|this| {
631 this.block_metas = true;
632 Ok(())
633 })
634 }
635
636 pub fn accounts_include_all(self) -> Self {
638 self.mutate(|this| {
639 this.accounts_include_all = true;
640 Ok(())
641 })
642 }
643
644 pub fn accounts<I: IntoIterator>(self, it: I) -> Self
646 where I::Item: AsRef<[u8]> {
647 self.mutate(|this| set_opt(&mut this.accounts, "accounts", collect_pubkeys(it)?))
648 }
649
650 pub fn account_owners<I: IntoIterator>(self, it: I) -> Self
652 where I::Item: AsRef<[u8]> {
653 self.mutate(|this| {
654 set_opt(
655 &mut this.account_owners,
656 "account_owners",
657 collect_pubkeys(it)?,
658 )
659 })
660 }
661
662 pub fn transaction_accounts<I: IntoIterator>(self, it: I) -> Self
668 where I::Item: AsRef<[u8]> {
669 self.mutate(|this| {
670 set_opt(
671 &mut this.transaction_accounts_required,
672 "transaction_accounts_required",
673 collect_pubkeys(it)?,
674 )
675 })
676 }
677
678 pub fn transaction_accounts_include<I: IntoIterator>(self, it: I) -> Self
683 where I::Item: AsRef<[u8]> {
684 self.mutate(|this| {
685 set_opt(
686 &mut this.transaction_accounts_include,
687 "transaction_accounts_include",
688 collect_pubkeys(it)?,
689 )
690 })
691 }
692
693 pub fn block_accounts_include<I: IntoIterator>(self, it: I) -> Self
695 where I::Item: AsRef<[u8]> {
696 self.mutate(|this| {
697 set_opt(
698 &mut this.block_accounts_include,
699 "block_accounts_include",
700 collect_pubkeys(it)?,
701 )
702 })
703 }
704
705 pub fn block_include_accounts(self) -> Self {
707 self.mutate(|this| {
708 this.block_include_accounts = true;
709 Ok(())
710 })
711 }
712
713 pub fn block_include_transactions(self) -> Self {
715 self.mutate(|this| {
716 this.block_include_transactions = true;
717 Ok(())
718 })
719 }
720
721 pub fn block_include_entries(self) -> Self {
723 self.mutate(|this| {
724 this.block_include_entries = true;
725 Ok(())
726 })
727 }
728}
729
730#[derive(Debug, Clone)]
732pub struct Filters {
733 pub parsers_filters: HashMap<String, Prefilter>,
735}
736
737impl Filters {
738 #[inline]
740 #[must_use]
741 pub fn new(filters: HashMap<String, Prefilter>) -> Self {
742 Self {
743 parsers_filters: filters,
744 }
745 }
746}
747
748#[derive(Debug, Clone, Copy, Deserialize, clap::ValueEnum)]
751#[serde(rename_all = "lowercase")]
752pub enum CommitmentLevel {
753 Processed,
755 Confirmed,
757 Finalized,
759}
760
761impl From<geyser::CommitmentLevel> for CommitmentLevel {
762 fn from(value: geyser::CommitmentLevel) -> Self {
763 match value {
764 geyser::CommitmentLevel::Processed => Self::Processed,
765 geyser::CommitmentLevel::Confirmed => Self::Confirmed,
766 geyser::CommitmentLevel::Finalized => Self::Finalized,
767 }
768 }
769}
770
771impl From<Filters> for SubscribeRequest {
772 fn from(value: Filters) -> Self {
773 SubscribeRequest {
774 accounts: value
775 .parsers_filters
776 .iter()
777 .filter_map(|(k, v)| {
778 let v = v.account.as_ref()?;
779
780 Some((k.clone(), SubscribeRequestFilterAccounts {
781 account: v.accounts.iter().map(ToString::to_string).collect(),
782 owner: v.owners.iter().map(ToString::to_string).collect(),
783 filters: vec![],
785 nonempty_txn_signature: None,
787 }))
788 })
789 .collect(),
790 slots: value
791 .parsers_filters
792 .iter()
793 .filter_map(|(k, v)| {
794 v.slot?;
795 Some((k.clone(), SubscribeRequestFilterSlots {
796 filter_by_commitment: Some(true),
797 interslot_updates: None,
798 }))
799 })
800 .collect(),
801 transactions: value
802 .parsers_filters
803 .iter()
804 .filter_map(|(k, v)| {
805 let v = v.transaction.as_ref()?;
806
807 Some((k.clone(), SubscribeRequestFilterTransactions {
808 vote: None,
809 failed: Some(false),
811 signature: None,
812 account_include: v
813 .accounts_include
814 .iter()
815 .map(ToString::to_string)
816 .collect(),
817 account_exclude: [].into_iter().collect(),
818 account_required: v
819 .accounts_required
820 .iter()
821 .map(ToString::to_string)
822 .collect(),
823 }))
824 })
825 .collect(),
826 transactions_status: [].into_iter().collect(),
827 blocks: value
828 .parsers_filters
829 .iter()
830 .filter_map(|(k, v)| {
831 let v = v.block.as_ref()?;
832
833 Some((k.clone(), SubscribeRequestFilterBlocks {
834 account_include: v
835 .accounts_include
836 .iter()
837 .map(ToString::to_string)
838 .collect(),
839 include_transactions: Some(v.include_transactions),
840 include_accounts: Some(v.include_accounts),
841 include_entries: Some(v.include_entries),
842 }))
843 })
844 .collect(),
845 blocks_meta: value
846 .parsers_filters
847 .iter()
848 .filter_map(|(k, v)| {
849 v.block_meta?;
850 Some((k.clone(), SubscribeRequestFilterBlocksMeta {}))
851 })
852 .collect(),
853 entry: [].into_iter().collect(),
854 commitment: None,
855 accounts_data_slice: vec![],
856 ping: None,
857 from_slot: None,
858 }
859 }
860}