1pub mod config;
269
270#[cfg(feature = "prometheus")]
271pub mod metrics;
272
273pub(crate) mod grpc;
274pub(crate) mod runtime;
275pub(crate) mod util;
276
277use {
278 crate::proto::GetSlotRangeRequest,
279 config::FumaroleConfig,
280 futures::future::{Either, select},
281 proto::control_response::Response,
282 runtime::{
283 state_machine::{DEFAULT_SLOT_MEMORY_RETENTION, FumaroleSM},
284 tokio::{
285 DEFAULT_GC_INTERVAL, DownloadTaskRunnerChannels, LegacyGrpcDownloadTaskRunner,
286 TokioFumeDragonsmouthRuntime,
287 },
288 },
289 semver::Version,
290 std::{
291 collections::HashMap,
292 num::{NonZeroU8, NonZeroUsize},
293 sync::Arc,
294 time::{Duration, Instant},
295 },
296 tokio::sync::mpsc,
297 tokio_stream::wrappers::ReceiverStream,
298 tonic::{
299 metadata::{
300 Ascii, MetadataKey, MetadataValue,
301 errors::{InvalidMetadataKey, InvalidMetadataValue},
302 },
303 service::{Interceptor, interceptor::InterceptedService},
304 transport::{Channel, ClientTlsConfig},
305 },
306 util::grpc::into_bounded_mpsc_rx,
307 uuid::Uuid,
308};
309
310mod solana {
311 #[allow(unused_imports)]
312 pub use yellowstone_grpc_proto::solana::{
313 storage,
314 storage::{confirmed_block, confirmed_block::*},
315 };
316}
317
318mod geyser {
319 pub use yellowstone_grpc_proto::geyser::*;
320}
321
322#[allow(clippy::missing_const_for_fn)]
323#[allow(clippy::all)]
324pub mod proto {
325 tonic::include_proto!("fumarole");
326}
327
328use {
329 crate::grpc::FumaroleGrpcConnector,
330 proto::{JoinControlPlane, fumarole_client::FumaroleClient as TonicFumaroleClient},
331 runtime::tokio::DataPlaneConn,
332 tonic::transport::Endpoint,
333};
334
335#[derive(Clone)]
336struct FumeInterceptor {
337 x_token: Option<MetadataValue<Ascii>>,
338 metadata: HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>,
339}
340
341impl Interceptor for FumeInterceptor {
342 fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
343 let mut request = request;
344 let metadata = request.metadata_mut();
345 if let Some(x_token) = &self.x_token {
346 metadata.insert("x-token", x_token.clone());
347 }
348 for (key, value) in &self.metadata {
349 metadata.insert(key.clone(), value.clone());
350 }
351 Ok(request)
352 }
353}
354
355#[derive(Default)]
359pub struct FumaroleClientBuilder {
360 pub metadata: HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>,
361 pub with_compression: bool,
362}
363
364#[derive(Debug, thiserror::Error)]
365pub enum InvalidMetadataHeader {
366 #[error(transparent)]
367 InvalidMetadataKey(#[from] InvalidMetadataKey),
368 #[error(transparent)]
369 InvalidMetadataValue(#[from] InvalidMetadataValue),
370}
371
372#[derive(Debug, thiserror::Error)]
373pub enum ConnectError {
374 #[error(transparent)]
375 InvalidUri(#[from] http::uri::InvalidUri),
376 #[error(transparent)]
377 TransportError(#[from] tonic::transport::Error),
378 #[error(transparent)]
379 InvalidXToken(#[from] tonic::metadata::errors::InvalidMetadataValue),
380 #[error(transparent)]
381 InvalidMetadataHeader(#[from] InvalidMetadataHeader),
382}
383
384pub const DEFAULT_DRAGONSMOUTH_CAPACITY: usize = 10_000_000;
388
389pub const DEFAULT_COMMIT_INTERVAL: Duration = Duration::from_secs(10);
393
394pub const DEFAULT_MAX_SLOT_DOWNLOAD_ATTEMPT: usize = 3;
398
399const MAX_PARA_DATA_STREAMS: u8 = 20;
403
404pub const DEFAULT_PARA_DATA_STREAMS: u8 = 4;
408
409pub const DEFAULT_CONCURRENT_DOWNLOAD_LIMIT_PER_TCP: usize = 2;
413
414pub const DEFAULT_REFRESH_TIP_INTERVAL: Duration = Duration::from_secs(5); pub(crate) type GrpcFumaroleClient =
421 TonicFumaroleClient<InterceptedService<Channel, FumeInterceptor>>;
422#[derive(Clone)]
426pub struct FumaroleClient {
427 connector: FumaroleGrpcConnector,
428 inner: GrpcFumaroleClient,
429}
430
431#[derive(Debug, thiserror::Error)]
432pub enum DragonsmouthSubscribeError {
433 #[error(transparent)]
434 GrpcStatus(#[from] tonic::Status),
435 #[error("grpc stream closed")]
436 StreamClosed,
437}
438
439#[derive(Debug, thiserror::Error)]
440pub enum FumaroleStreamError {
441 #[error(transparent)]
442 Custom(Box<dyn std::error::Error + Send + Sync>),
443 #[error("grpc stream closed")]
444 StreamClosed,
445}
446
447pub struct FumaroleSubscribeConfig {
451 pub num_data_plane_tcp_connections: NonZeroU8,
455
456 pub concurrent_download_limit_per_tcp: NonZeroUsize,
460
461 pub commit_interval: Duration,
465
466 pub max_failed_slot_download_attempt: usize,
470
471 pub data_channel_capacity: NonZeroUsize,
475
476 pub gc_interval: usize,
480
481 pub slot_memory_retention: usize,
485
486 pub refresh_tip_stats_interval: Duration,
490
491 pub no_commit: bool,
498
499 pub enable_sharded_block_download: bool,
504}
505
506impl Default for FumaroleSubscribeConfig {
507 fn default() -> Self {
508 Self {
509 num_data_plane_tcp_connections: NonZeroU8::new(DEFAULT_PARA_DATA_STREAMS).unwrap(),
510 concurrent_download_limit_per_tcp: NonZeroUsize::new(
511 DEFAULT_CONCURRENT_DOWNLOAD_LIMIT_PER_TCP,
512 )
513 .unwrap(),
514 commit_interval: DEFAULT_COMMIT_INTERVAL,
515 max_failed_slot_download_attempt: DEFAULT_MAX_SLOT_DOWNLOAD_ATTEMPT,
516 data_channel_capacity: NonZeroUsize::new(DEFAULT_DRAGONSMOUTH_CAPACITY).unwrap(),
517 gc_interval: DEFAULT_GC_INTERVAL,
518 slot_memory_retention: DEFAULT_SLOT_MEMORY_RETENTION,
519 refresh_tip_stats_interval: DEFAULT_REFRESH_TIP_INTERVAL, no_commit: false,
521 enable_sharded_block_download: true,
522 }
523 }
524}
525
526pub enum FumeControlPlaneError {
527 Disconnected,
528}
529
530pub enum FumeDataPlaneError {
531 Disconnected,
532}
533
534pub enum FumaroleError {
535 ControlPlaneDisconnected,
536 DataPlaneDisconnected,
537 InvalidSubscribeRequest,
538}
539
540impl From<tonic::Status> for FumaroleError {
541 fn from(status: tonic::Status) -> Self {
542 match status.code() {
543 tonic::Code::Unavailable => FumaroleError::ControlPlaneDisconnected,
544 tonic::Code::Internal => FumaroleError::DataPlaneDisconnected,
545 _ => FumaroleError::InvalidSubscribeRequest,
546 }
547 }
548}
549
550pub struct DragonsmouthAdapterSession {
555 pub sink: mpsc::Sender<geyser::SubscribeRequest>,
560 pub source: mpsc::Receiver<Result<geyser::SubscribeUpdate, tonic::Status>>,
565 pub fumarole_handle: tokio::task::JoinHandle<()>,
573}
574
575fn string_pairs_to_metadata_header(
576 headers: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<str>)>,
577) -> Result<HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>, InvalidMetadataHeader> {
578 headers
579 .into_iter()
580 .map(|(k, v)| {
581 let key = MetadataKey::from_bytes(k.as_ref().as_bytes())?;
582 let value: MetadataValue<Ascii> = v.as_ref().try_into()?;
583 Ok((key, value))
584 })
585 .collect()
586}
587
588impl FumaroleClient {
589 pub async fn connect(config: FumaroleConfig) -> Result<FumaroleClient, ConnectError> {
590 let connection_window_size: u32 = config
591 .initial_connection_window_size
592 .as_u64()
593 .try_into()
594 .expect("initial_connection_window_size must fit in u32");
595 let stream_window_size: u32 = config
596 .initial_stream_window_size
597 .as_u64()
598 .try_into()
599 .expect("initial_stream_window_size must fit in u32");
600
601 let mut tonic_endpoints = Vec::with_capacity(1);
602 #[allow(clippy::single_element_loop)]
603 for endpoint_str in [config.endpoint.clone()] {
604 let endpoints = Endpoint::from_shared(endpoint_str)?
605 .tls_config(ClientTlsConfig::new().with_native_roots())?
606 .initial_connection_window_size(connection_window_size)
607 .initial_stream_window_size(stream_window_size)
608 .http2_adaptive_window(config.enable_http2_adaptive_window);
609 tonic_endpoints.push(endpoints);
610 }
611
612 let connector = FumaroleGrpcConnector {
613 config: config.clone(),
614 endpoints: tonic_endpoints,
615 connect_cnt: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
616 };
617
618 let client = connector.connect().await?;
619 Ok(FumaroleClient {
620 connector,
621 inner: client,
622 })
623 }
624
625 pub async fn version(&mut self) -> Result<proto::VersionResponse, tonic::Status> {
629 let request = tonic::Request::new(proto::VersionRequest {});
630 let response = self.inner.version(request).await?;
631 Ok(response.into_inner())
632 }
633
634 pub async fn dragonsmouth_subscribe<S>(
638 &mut self,
639 subscriber_name: S,
640 request: geyser::SubscribeRequest,
641 ) -> Result<DragonsmouthAdapterSession, tonic::Status>
642 where
643 S: AsRef<str>,
644 {
645 let handle = tokio::runtime::Handle::current();
646 self.dragonsmouth_subscribe_with_config_on(
647 subscriber_name,
648 request,
649 Default::default(),
650 handle,
651 )
652 .await
653 }
654
655 pub async fn dragonsmouth_subscribe_with_config<S>(
656 &mut self,
657 consumer_group_name: S,
658 request: geyser::SubscribeRequest,
659 config: FumaroleSubscribeConfig,
660 ) -> Result<DragonsmouthAdapterSession, tonic::Status>
661 where
662 S: AsRef<str>,
663 {
664 let handle = tokio::runtime::Handle::current();
665 self.dragonsmouth_subscribe_with_config_on(consumer_group_name, request, config, handle)
666 .await
667 }
668
669 pub async fn dragonsmouth_subscribe_with_config_on<S>(
674 &mut self,
675 subscriber_name: S,
676 request: geyser::SubscribeRequest,
677 config: FumaroleSubscribeConfig,
678 handle: tokio::runtime::Handle,
679 ) -> Result<DragonsmouthAdapterSession, tonic::Status>
680 where
681 S: AsRef<str>,
682 {
683 let version = self.version().await?;
684 let semver_version = Version::parse(&version.version).ok();
685 if let Some(semver_version) = &semver_version {
686 tracing::debug!("Fumarole service version: {}", semver_version);
687 } else {
688 tracing::warn!(
689 "Failed to parse fumarole service version: {}",
690 version.version
691 );
692 }
693 const SHARDED_DOWNLOAD_MINIMUM_MINOR_VERSION: u64 = 39;
694 let use_sharded_downlaod = config.enable_sharded_block_download
695 && semver_version
696 .filter(|v| v.minor > SHARDED_DOWNLOAD_MINIMUM_MINOR_VERSION)
697 .is_none();
698
699 if config.enable_sharded_block_download && !use_sharded_downlaod {
700 tracing::warn!(
701 "Sharded block download is enabled in the config, but the fumarole service version {} does not support it. Falling back to non-sharded download.",
702 version.version
703 );
704 }
705
706 if use_sharded_downlaod {
707 tracing::debug!("using sharded block download");
708 } else {
709 tracing::debug!("using non-sharded block download");
710 }
711
712 let request = Arc::new(request);
713 assert!(
714 config.num_data_plane_tcp_connections.get() <= MAX_PARA_DATA_STREAMS,
715 "num_data_plane_tcp_connections must be less than or equal to {MAX_PARA_DATA_STREAMS}"
716 );
717
718 assert!(
719 config.refresh_tip_stats_interval >= Duration::from_secs(5),
720 "refresh_tip_stats_interval must be greater than or equal to 5 seconds"
721 );
722
723 use {proto::ControlCommand, runtime::tokio::DragonsmouthSubscribeRequestBidi};
724
725 let (dragonsmouth_outlet, dragonsmouth_inlet) =
726 mpsc::channel(DEFAULT_DRAGONSMOUTH_CAPACITY);
727 let (fume_control_plane_tx, fume_control_plane_rx) = mpsc::channel(100);
728
729 let initial_join = JoinControlPlane {
730 consumer_group_name: Some(subscriber_name.as_ref().to_string()),
731 };
732 let initial_join_command = ControlCommand {
733 command: Some(proto::control_command::Command::InitialJoin(initial_join)),
734 };
735
736 fume_control_plane_tx
739 .send(initial_join_command)
740 .await
741 .expect("failed to send initial join");
742
743 let resp = if use_sharded_downlaod {
744 self.inner
745 .subscribe_v2(ReceiverStream::new(fume_control_plane_rx))
746 .await?
747 } else {
748 self.inner
749 .subscribe(ReceiverStream::new(fume_control_plane_rx))
750 .await?
751 };
752
753 let mut streaming = resp.into_inner();
754 let fume_control_plane_tx = fume_control_plane_tx.clone();
755 let control_response = streaming.message().await?.expect("none");
756 let fume_control_plane_rx = into_bounded_mpsc_rx(100, streaming);
757 let response = control_response.response.expect("none");
758 let Response::Init(initial_state) = response else {
759 panic!("unexpected initial response: {response:?}")
760 };
761
762 assert!(
764 initial_state.last_committed_offsets.len() == 1,
765 "sharding not supported"
766 );
767 let last_committed_offset = initial_state
768 .last_committed_offsets
769 .get(&0)
770 .expect("no last committed offset");
771
772 let sm = FumaroleSM::new(*last_committed_offset, config.slot_memory_retention);
773
774 let (dm_tx, dm_rx) = mpsc::channel(config.data_channel_capacity.get());
775 let dm_bidi = DragonsmouthSubscribeRequestBidi {
776 tx: dm_tx.clone(),
777 rx: dm_rx,
778 };
779
780 let mut data_plane_channel_vec =
781 Vec::with_capacity(config.num_data_plane_tcp_connections.get() as usize);
782 for _ in 0..config.num_data_plane_tcp_connections.get() {
784 let client = self
785 .connector
786 .connect()
787 .await
788 .expect("failed to connect to fumarole");
789 let conn = DataPlaneConn::new(client);
790 data_plane_channel_vec.push(conn);
791 }
792 let (download_task_runner_cnc_tx, download_task_runner_cnc_rx) = mpsc::channel(10);
793 let (download_task_queue_tx, download_task_queue_rx) = mpsc::channel(100);
795 let (download_result_tx, download_result_rx) = mpsc::channel(1000);
796
797 let download_task_runner_jh = if use_sharded_downlaod {
798 let grpc_download_task_runner = runtime::tokio::GrpcShardedDownloadOrchestrator::new(
799 data_plane_channel_vec,
800 self.connector.clone(),
801 download_task_runner_cnc_rx,
802 download_task_queue_rx,
803 download_result_tx,
804 config.max_failed_slot_download_attempt,
805 Arc::clone(&request),
806 config.concurrent_download_limit_per_tcp,
807 dragonsmouth_outlet.clone(),
808 );
809 handle.spawn(grpc_download_task_runner.run())
810 } else {
811 let grpc_download_task_runner = LegacyGrpcDownloadTaskRunner::new(
812 data_plane_channel_vec,
813 self.connector.clone(),
814 download_task_runner_cnc_rx,
815 download_task_queue_rx,
816 download_result_tx,
817 config.max_failed_slot_download_attempt,
818 Arc::clone(&request),
819 config.concurrent_download_limit_per_tcp.get()
820 * config.num_data_plane_tcp_connections.get() as usize,
821 dragonsmouth_outlet.clone(),
822 );
823
824 handle.spawn(grpc_download_task_runner.run())
825 };
826
827 let download_task_runner_chans = DownloadTaskRunnerChannels {
828 download_task_queue_tx,
829 cnc_tx: download_task_runner_cnc_tx,
830 download_result_rx,
831 };
832
833 let tokio_rt = TokioFumeDragonsmouthRuntime {
834 sm,
835 fumarole_client: self.clone(),
836 blockchain_id: initial_state.blockchain_id,
837 dragonsmouth_bidi: dm_bidi,
838 subscribe_request: request,
839 download_task_runner_chans,
840 persistent_subscriber_name: subscriber_name.as_ref().to_string(),
841 control_plane_tx: fume_control_plane_tx,
842 control_plane_rx: fume_control_plane_rx,
843 dragonsmouth_outlet,
844 commit_interval: config.commit_interval,
845 last_commit: Instant::now(),
846 get_tip_interval: config.refresh_tip_stats_interval,
847 last_tip: Instant::now(),
848 gc_interval: config.gc_interval,
849 non_critical_background_jobs: Default::default(),
850 last_history_poll: Default::default(),
851 no_commit: config.no_commit,
852 stop: false,
853 enable_sharded_block_download: use_sharded_downlaod,
854 };
855 let fumarole_rt_jh = handle.spawn(tokio_rt.run());
856 let fut = async move {
857 let either = select(download_task_runner_jh, fumarole_rt_jh).await;
858 match either {
859 Either::Left((result, _)) => {
860 let _ = result.expect("fumarole download task runner failed");
861 }
862 Either::Right((result, _)) => {
863 let _ = result.expect("fumarole runtime failed");
864 }
865 }
866 };
867 let fumarole_handle = handle.spawn(fut);
868 let dm_session = DragonsmouthAdapterSession {
869 sink: dm_tx,
870 source: dragonsmouth_inlet,
871 fumarole_handle,
872 };
873 Ok(dm_session)
874 }
875
876 pub async fn list_consumer_groups(
877 &mut self,
878 request: impl tonic::IntoRequest<proto::ListConsumerGroupsRequest>,
879 ) -> std::result::Result<tonic::Response<proto::ListConsumerGroupsResponse>, tonic::Status>
880 {
881 tracing::trace!("list_consumer_groups called");
882 self.inner.list_consumer_groups(request).await
883 }
884
885 pub async fn get_consumer_group_info(
886 &mut self,
887 request: impl tonic::IntoRequest<proto::GetConsumerGroupInfoRequest>,
888 ) -> std::result::Result<tonic::Response<proto::ConsumerGroupInfo>, tonic::Status> {
889 tracing::trace!("get_consumer_group_info called");
890 self.inner.get_consumer_group_info(request).await
891 }
892
893 pub async fn delete_consumer_group(
894 &mut self,
895 request: impl tonic::IntoRequest<proto::DeleteConsumerGroupRequest>,
896 ) -> std::result::Result<tonic::Response<proto::DeleteConsumerGroupResponse>, tonic::Status>
897 {
898 tracing::trace!("delete_consumer_group called");
899 self.inner.delete_consumer_group(request).await
900 }
901
902 pub async fn create_consumer_group(
903 &mut self,
904 request: impl tonic::IntoRequest<proto::CreateConsumerGroupRequest>,
905 ) -> std::result::Result<tonic::Response<proto::CreateConsumerGroupResponse>, tonic::Status>
906 {
907 tracing::trace!("create_consumer_group called");
908 self.inner.create_consumer_group(request).await
909 }
910
911 pub async fn get_chain_tip(
912 &mut self,
913 request: impl tonic::IntoRequest<proto::GetChainTipRequest>,
914 ) -> std::result::Result<tonic::Response<proto::GetChainTipResponse>, tonic::Status> {
915 tracing::trace!("get_chain_tip called");
916 self.inner.get_chain_tip(request).await
917 }
918
919 pub async fn get_slot_range(
920 &mut self,
921 ) -> std::result::Result<tonic::Response<proto::GetSlotRangeResponse>, tonic::Status> {
922 tracing::trace!("get_slot_range called");
923 self.inner
924 .get_slot_range(GetSlotRangeRequest {
925 blockchain_id: Uuid::nil().as_bytes().to_vec(),
926 })
927 .await
928 }
929}