Skip to main content

yellowstone_fumarole_client/
lib.rs

1//!
2//! A Rust implementation of the Yellowstone Fumarole Client using Tokio and Tonic.
3//!
4//! Fumarole Client uses gRPC connections to communicate with the Fumarole service.
5//!
6//! # Yellowstone-GRPC vs Yellowstone-Fumarole
7//!
8//! For the most part, the API is similar to the original [`yellowstone-grpc`] client.
9//!
10//! However, there are some differences:
11//!
12//! - The `yellowstone-fumarole` (Coming soon) client uses multiple gRPC connections to communicate with the Fumarole service : avoids [`HoL`] blocking.
13//! - The `yellowstone-fumarole` subscribers are persistent and can be reused across multiple sessions (not computer).
14//! - The `yellowstone-fumarole` can reconnect to the Fumarole service if the connection is lost.
15//!
16//! # Examples
17//!
18//! Examples can be found in the [`examples`] directory.
19//!
20//! ## Create a `FumaroleClient`
21//!
22//! To create a `FumaroleClient`, you need to provide a configuration object.
23//!
24//! ```ignore
25//! use yellowstone_fumarole_client::FumaroleClient;
26//! use yellowstone_fumarole_client::config::FumaroleConfig;
27//!
28//! #[tokio::main]
29//! async fn main() {
30//!     let config = FumaroleConfig {
31//!         endpoint: "https://example.com".to_string(),
32//!         x_token: Some("00000000-0000-0000-0000-000000000000".to_string()),
33//!         max_decoding_message_size_bytes: FumaroleConfig::default_max_decoding_message_size_bytes(),
34//!         x_metadata: Default::default(),
35//!     };
36//!     let fumarole_client = FumaroleClient::connect(config)
37//!         .await
38//!         .expect("Failed to connect to fumarole");
39//! }
40//! ```
41//!
42//! The prefered way to create `FumaroleConfig` is use `serde_yaml` to deserialize from a YAML file.
43//!
44//! ```ignore
45//! let config_file = std::fs::File::open("path/to/config.yaml").unwrap();
46//! let config: FumaroleConfig = serde_yaml::from_reader(config_file).unwrap();
47//! ```
48//!
49//! Here's an example of a YAML file:
50//!
51//! ```yaml
52//! endpoint: https://example.com
53//! x-token: 00000000-0000-0000-0000-000000000000
54//! response_compression: zstd
55//! ```
56//!
57//!
58//! ## Dragonsmouth-like Subscribe
59//!
60//! ```rust
61//! use {
62//!     clap::Parser,
63//!     solana_sdk::{bs58, pubkey::Pubkey},
64//!     std::{collections::HashMap, path::PathBuf},
65//!     yellowstone_fumarole_client::{
66//!         config::FumaroleConfig, DragonsmouthAdapterSession, FumaroleClient,
67//!     },
68//!     yellowstone_grpc_proto::geyser::{
69//!         subscribe_update::UpdateOneof, SubscribeRequest,
70//!         SubscribeRequestFilterTransactions, SubscribeUpdateAccount, SubscribeUpdateTransaction,
71//!     },
72//! };
73//!
74//! #[derive(Debug, Clone, Parser)]
75//! #[clap(author, version, about = "Yellowstone Fumarole Example")]
76//! struct Args {
77//!     /// Path to static config file
78//!     #[clap(long)]
79//!     config: PathBuf,
80//!
81//!     #[clap(subcommand)]
82//!     action: Action,
83//! }
84//!
85//! #[derive(Debug, Clone, Parser)]
86//! enum Action {
87//!     /// Subscribe to fumarole events
88//!     Subscribe(SubscribeArgs),
89//! }
90//!
91//! #[derive(Debug, Clone, Parser)]
92//! struct SubscribeArgs {
93//!     /// Name of the persistent subscriber to use
94//!     #[clap(long)]
95//!     name: String,
96//! }
97//!
98//! fn summarize_account(account: SubscribeUpdateAccount) -> Option<String> {
99//!     let slot = account.slot;
100//!     let account = account.account?;
101//!     let pubkey = Pubkey::try_from(account.pubkey).expect("Failed to parse pubkey");
102//!     let owner = Pubkey::try_from(account.owner).expect("Failed to parse owner");
103//!     Some(format!("account,{},{},{}", slot, pubkey, owner))
104//! }
105//!
106//! fn summarize_tx(tx: SubscribeUpdateTransaction) -> Option<String> {
107//!     let slot = tx.slot;
108//!     let tx = tx.transaction?;
109//!     let sig = bs58::encode(tx.signature).into_string();
110//!     Some(format!("tx,{slot},{sig}"))
111//! }
112//!
113//! async fn subscribe(args: SubscribeArgs, config: FumaroleConfig) {
114//!     // This request listen for all account updates and transaction updates
115//!     let request = SubscribeRequest {
116//!         transactions: HashMap::from([(
117//!             "f1".to_owned(),
118//!             SubscribeRequestFilterTransactions::default(),
119//!         )]),
120//!         ..Default::default()
121//!     };
122//!
123//!     let mut fumarole_client = FumaroleClient::connect(config)
124//!         .await
125//!         .expect("Failed to connect to fumarole");
126//!
127//!     let dragonsmouth_session = fumarole_client
128//!         .dragonsmouth_subscribe(args.name, request)
129//!         .await
130//!         .expect("Failed to subscribe");
131//!
132//!     let DragonsmouthAdapterSession {
133//!         sink: _,
134//!         mut source,
135//!         mut fumarole_handle,
136//!     } = dragonsmouth_session;
137//!     
138//!     loop {
139//!
140//!         tokio::select! {
141//!             result = &mut fumarole_handle => {
142//!                 eprintln!("Fumarole handle closed: {:?}", result);
143//!                 break;
144//!             }
145//!             maybe = source.recv() => {
146//!                 match maybe {
147//!                     None => {
148//!                         eprintln!("Source closed");
149//!                         break;
150//!                     }
151//!                     Some(result) => {
152//!                         let event = result.expect("Failed to receive event");
153//!                         let message = if let Some(oneof) = event.update_oneof {
154//!                             match oneof {
155//!                                 UpdateOneof::Account(account_update) => {
156//!                                     summarize_account(account_update)
157//!                                 }
158//!                                 UpdateOneof::Transaction(tx) => {
159//!                                     summarize_tx(tx)
160//!                                 }                    
161//!                                 _ => None,
162//!                             }
163//!                         } else {
164//!                             None
165//!                         };
166//!
167//!                         if let Some(message) = message {
168//!                             println!("{}", message);
169//!                         }
170//!                     }
171//!                 }
172//!             }
173//!         }
174//!     }
175//! }
176//!
177//! #[tokio::main]
178//! async fn main() {
179//!     let args: Args = Args::parse();
180//!     let config = std::fs::read_to_string(&args.config).expect("Failed to read config file");
181//!     let config: FumaroleConfig =
182//!         serde_yaml::from_str(&config).expect("Failed to parse config file");
183//!
184//!     match args.action {
185//!         Action::Subscribe(sub_args) => {
186//!             subscribe(sub_args, config).await;
187//!         }
188//!     }
189//! }
190//! ```
191//!
192//! ## High-traffic workload: parallel subscription + zstd
193//!
194//! For high-traffic workload or for higher latency connection, using parallel subscription and zstd will greatly improve performance to stay on-tip.
195//!
196//! Inside your `config.yaml` file enable compression with `response_compression: zstd`:
197//!
198//! ```yaml
199//! endpoint: https://fumarole.endpoint.rpcpool.com
200//! x-token: 00000000-0000-0000-0000-000000000000
201//! response_compression: zstd
202//! ```
203//!
204//! Uses `FumaroleSubscribeConfig` to configure the parallel subscription and zstd compression.
205//!
206//! ```rust
207//! let config: FumaroleConfig = serde_yaml::from_reader("<path/to/config.yaml>").expect("failed to parse fumarole config");
208//!
209//! let request = SubscribeRequest {
210//!    transactions: HashMap::from([(
211//!        "f1".to_owned(),
212//!        SubscribeRequestFilterTransactions::default(),
213//!    )]),
214//!    ..Default::default()
215//! };
216//!
217//!
218//! let mut fumarole_client = FumaroleClient::connect(config)
219//!    .await
220//!    .expect("Failed to connect to fumarole");
221//!
222//! let subscribe_config = FumaroleSubscribeConfig {
223//!    num_data_plane_tcp_connections: NonZeroU8::new(4).unwrap(), // maximum of 4 TCP connections is allowed
224//!    ..Default::default()
225//! };
226//!
227//! let dragonsmouth_session = fumarole_client
228//!    .dragonsmouth_subscribe_with_config(args.name, request, subscribe_config)
229//!    .await
230//!    .expect("Failed to subscribe");
231//! ```
232//!
233//!
234//! ## Enable Prometheus Metrics
235//!
236//! To enable Prometheus metrics, add the `features = [prometheus]` to your `Cargo.toml` file:
237//! ```toml
238//! [dependencies]
239//! yellowstone-fumarole-client = { version = "x.y.z", features = ["prometheus"] }
240//! ```
241//!
242//! Then, you can use the `metrics` module to register and expose metrics:
243//!
244//! ```rust
245//! use yellowstone_fumarole_client::metrics;
246//! use prometheus::{Registry};
247//!
248//! let r = Registry::new();
249//!
250//! metrics::register_metrics(&r);
251//!
252//! // After registering, you should see `fumarole_` prefixed metrics in the registry.
253//! ```
254//!
255//! # Getting Started
256//!
257//! Follows the instruction in the [`README`] file to get started.
258//!
259//! # Feature Flags
260//!
261//! - `prometheus`: Enables Prometheus metrics for the Fumarole client.
262//!
263//! [`examples`]: https://github.com/rpcpool/yellowstone-fumarole/tree/main/examples
264//! [`README`]: https://github.com/rpcpool/yellowstone-fumarole/tree/main/README.md
265//! [`yellowstone-grpc`]: https://github.com/rpcpool/yellowstone-grpc
266//! [`HoL`]: https://en.wikipedia.org/wiki/Head-of-line_blocking
267
268pub mod config;
269
270#[cfg(feature = "prometheus")]
271pub mod metrics;
272
273pub(crate) mod grpc;
274pub(crate) mod runtime;
275pub(crate) mod util;
276
277use {
278    crate::proto::GetSlotRangeRequest,
279    config::FumaroleConfig,
280    futures::future::{Either, select},
281    proto::control_response::Response,
282    runtime::{
283        state_machine::{DEFAULT_SLOT_MEMORY_RETENTION, FumaroleSM},
284        tokio::{
285            DEFAULT_GC_INTERVAL, DownloadTaskRunnerChannels, LegacyGrpcDownloadTaskRunner,
286            TokioFumeDragonsmouthRuntime,
287        },
288    },
289    semver::Version,
290    std::{
291        collections::HashMap,
292        num::{NonZeroU8, NonZeroUsize},
293        sync::Arc,
294        time::{Duration, Instant},
295    },
296    tokio::sync::mpsc,
297    tokio_stream::wrappers::ReceiverStream,
298    tonic::{
299        metadata::{
300            Ascii, MetadataKey, MetadataValue,
301            errors::{InvalidMetadataKey, InvalidMetadataValue},
302        },
303        service::{Interceptor, interceptor::InterceptedService},
304        transport::{Channel, ClientTlsConfig},
305    },
306    util::grpc::into_bounded_mpsc_rx,
307    uuid::Uuid,
308};
309
310mod solana {
311    #[allow(unused_imports)]
312    pub use yellowstone_grpc_proto::solana::{
313        storage,
314        storage::{confirmed_block, confirmed_block::*},
315    };
316}
317
318mod geyser {
319    pub use yellowstone_grpc_proto::geyser::*;
320}
321
322#[allow(clippy::missing_const_for_fn)]
323#[allow(clippy::all)]
324pub mod proto {
325    tonic::include_proto!("fumarole");
326}
327
328use {
329    crate::grpc::FumaroleGrpcConnector,
330    proto::{JoinControlPlane, fumarole_client::FumaroleClient as TonicFumaroleClient},
331    runtime::tokio::DataPlaneConn,
332    tonic::transport::Endpoint,
333};
334
335#[derive(Clone)]
336struct FumeInterceptor {
337    x_token: Option<MetadataValue<Ascii>>,
338    metadata: HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>,
339}
340
341impl Interceptor for FumeInterceptor {
342    fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
343        let mut request = request;
344        let metadata = request.metadata_mut();
345        if let Some(x_token) = &self.x_token {
346            metadata.insert("x-token", x_token.clone());
347        }
348        for (key, value) in &self.metadata {
349            metadata.insert(key.clone(), value.clone());
350        }
351        Ok(request)
352    }
353}
354
355///
356/// A builder for creating a [`FumaroleClient`].
357///
358#[derive(Default)]
359pub struct FumaroleClientBuilder {
360    pub metadata: HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>,
361    pub with_compression: bool,
362}
363
364#[derive(Debug, thiserror::Error)]
365pub enum InvalidMetadataHeader {
366    #[error(transparent)]
367    InvalidMetadataKey(#[from] InvalidMetadataKey),
368    #[error(transparent)]
369    InvalidMetadataValue(#[from] InvalidMetadataValue),
370}
371
372#[derive(Debug, thiserror::Error)]
373pub enum ConnectError {
374    #[error(transparent)]
375    InvalidUri(#[from] http::uri::InvalidUri),
376    #[error(transparent)]
377    TransportError(#[from] tonic::transport::Error),
378    #[error(transparent)]
379    InvalidXToken(#[from] tonic::metadata::errors::InvalidMetadataValue),
380    #[error(transparent)]
381    InvalidMetadataHeader(#[from] InvalidMetadataHeader),
382}
383
384///
385/// Default gRPC buffer capacity
386///
387pub const DEFAULT_DRAGONSMOUTH_CAPACITY: usize = 10_000_000;
388
389///
390/// Default Fumarole commit offset interval
391///
392pub const DEFAULT_COMMIT_INTERVAL: Duration = Duration::from_secs(10);
393
394///
395/// Default maximum number of consecutive failed slot download attempts before failing the fumarole session.
396///
397pub const DEFAULT_MAX_SLOT_DOWNLOAD_ATTEMPT: usize = 3;
398
399///
400/// MAXIMUM number of parallel data streams (TCP connections) to open to fumarole.
401///
402const MAX_PARA_DATA_STREAMS: u8 = 20;
403
404///
405/// Default number of parallel data streams (TCP connections) to open to fumarole.
406///
407pub const DEFAULT_PARA_DATA_STREAMS: u8 = 4;
408
409///
410/// Default maximum number of concurrent download requests to the fumarole service inside a single data plane TCP connection.
411///
412pub const DEFAULT_CONCURRENT_DOWNLOAD_LIMIT_PER_TCP: usize = 2;
413
414///
415/// Default refresh tip interval for the fumarole client.
416/// Only useful if you enable `prometheus` feature flags.
417///grpc_tx
418pub const DEFAULT_REFRESH_TIP_INTERVAL: Duration = Duration::from_secs(5); // seconds
419
420pub(crate) type GrpcFumaroleClient =
421    TonicFumaroleClient<InterceptedService<Channel, FumeInterceptor>>;
422///
423/// Yellowstone Fumarole SDK.
424///
425#[derive(Clone)]
426pub struct FumaroleClient {
427    connector: FumaroleGrpcConnector,
428    inner: GrpcFumaroleClient,
429}
430
431#[derive(Debug, thiserror::Error)]
432pub enum DragonsmouthSubscribeError {
433    #[error(transparent)]
434    GrpcStatus(#[from] tonic::Status),
435    #[error("grpc stream closed")]
436    StreamClosed,
437}
438
439#[derive(Debug, thiserror::Error)]
440pub enum FumaroleStreamError {
441    #[error(transparent)]
442    Custom(Box<dyn std::error::Error + Send + Sync>),
443    #[error("grpc stream closed")]
444    StreamClosed,
445}
446
447///
448/// Configuration for the Fumarole subscription session
449///
450pub struct FumaroleSubscribeConfig {
451    ///
452    /// Number of parallel data streams (TCP connections) to open to fumarole
453    ///
454    pub num_data_plane_tcp_connections: NonZeroU8,
455
456    ///
457    /// Maximum number of concurrent download requests to the fumarole service inside a single data plane TCP connection.
458    ///
459    pub concurrent_download_limit_per_tcp: NonZeroUsize,
460
461    ///
462    /// Commit interval for the fumarole client
463    ///
464    pub commit_interval: Duration,
465
466    ///
467    /// Maximum number of consecutive failed slot download attempts before failing the fumarole session.
468    ///
469    pub max_failed_slot_download_attempt: usize,
470
471    ///
472    /// Capacity of each data channel for the fumarole client
473    ///
474    pub data_channel_capacity: NonZeroUsize,
475
476    ///
477    /// Garbage collection interval for the fumarole client in ticks (loop iteration of the fumarole runtime)
478    ///
479    pub gc_interval: usize,
480
481    ///
482    /// How far back in time the fumarole client should retain slot memory.
483    /// This is used to avoid downloading the same slot multiple times.
484    pub slot_memory_retention: usize,
485
486    ///
487    /// Interval to refresh the tip stats from the fumarole service.
488    ///
489    pub refresh_tip_stats_interval: Duration,
490
491    ///
492    /// Whether to disable committing offsets to the fumarole service.
493    /// This is useful for testing or when you don't care about committing offsets.
494    /// If set to `true`, the fumarole client will not commit offsets to the fumarole service.
495    /// This mean the current session will never commit progression.
496    /// If set to `true`, [`FumaroleSubscribeConfig::commit_interval`] will be ignored.
497    pub no_commit: bool,
498
499    ///
500    /// Whether to enable sharded block download. If enabled, the fumarole will download block in shards and reassemble them in the client.
501    /// set to `true` by default.
502    ///
503    pub enable_sharded_block_download: bool,
504}
505
506impl Default for FumaroleSubscribeConfig {
507    fn default() -> Self {
508        Self {
509            num_data_plane_tcp_connections: NonZeroU8::new(DEFAULT_PARA_DATA_STREAMS).unwrap(),
510            concurrent_download_limit_per_tcp: NonZeroUsize::new(
511                DEFAULT_CONCURRENT_DOWNLOAD_LIMIT_PER_TCP,
512            )
513            .unwrap(),
514            commit_interval: DEFAULT_COMMIT_INTERVAL,
515            max_failed_slot_download_attempt: DEFAULT_MAX_SLOT_DOWNLOAD_ATTEMPT,
516            data_channel_capacity: NonZeroUsize::new(DEFAULT_DRAGONSMOUTH_CAPACITY).unwrap(),
517            gc_interval: DEFAULT_GC_INTERVAL,
518            slot_memory_retention: DEFAULT_SLOT_MEMORY_RETENTION,
519            refresh_tip_stats_interval: DEFAULT_REFRESH_TIP_INTERVAL, // Default to 5 seconds
520            no_commit: false,
521            enable_sharded_block_download: true,
522        }
523    }
524}
525
526pub enum FumeControlPlaneError {
527    Disconnected,
528}
529
530pub enum FumeDataPlaneError {
531    Disconnected,
532}
533
534pub enum FumaroleError {
535    ControlPlaneDisconnected,
536    DataPlaneDisconnected,
537    InvalidSubscribeRequest,
538}
539
540impl From<tonic::Status> for FumaroleError {
541    fn from(status: tonic::Status) -> Self {
542        match status.code() {
543            tonic::Code::Unavailable => FumaroleError::ControlPlaneDisconnected,
544            tonic::Code::Internal => FumaroleError::DataPlaneDisconnected,
545            _ => FumaroleError::InvalidSubscribeRequest,
546        }
547    }
548}
549
550///
551/// Dragonsmouth flavor fumarole session.
552/// Mimics the same API as dragonsmouth but uses fumarole as the backend.
553///
554pub struct DragonsmouthAdapterSession {
555    ///
556    /// Channel to send requests to the fumarole service.
557    /// If you don't need to change the subscribe request, you can drop this channel.
558    ///
559    pub sink: mpsc::Sender<geyser::SubscribeRequest>,
560    ///
561    /// Channel to receive updates from the fumarole service.
562    /// Dropping this channel will stop the fumarole session.
563    ///
564    pub source: mpsc::Receiver<Result<geyser::SubscribeUpdate, tonic::Status>>,
565    ///
566    /// Handle to the fumarole session client runtime.
567    /// Dropping this handle does not stop the fumarole session.
568    ///
569    /// If you want to stop the fumarole session, you need to drop the [`DragonsmouthAdapterSession::source`] channel,
570    /// then you could wait for the handle to finish.
571    ///
572    pub fumarole_handle: tokio::task::JoinHandle<()>,
573}
574
575fn string_pairs_to_metadata_header(
576    headers: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<str>)>,
577) -> Result<HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>, InvalidMetadataHeader> {
578    headers
579        .into_iter()
580        .map(|(k, v)| {
581            let key = MetadataKey::from_bytes(k.as_ref().as_bytes())?;
582            let value: MetadataValue<Ascii> = v.as_ref().try_into()?;
583            Ok((key, value))
584        })
585        .collect()
586}
587
588impl FumaroleClient {
589    pub async fn connect(config: FumaroleConfig) -> Result<FumaroleClient, ConnectError> {
590        let connection_window_size: u32 = config
591            .initial_connection_window_size
592            .as_u64()
593            .try_into()
594            .expect("initial_connection_window_size must fit in u32");
595        let stream_window_size: u32 = config
596            .initial_stream_window_size
597            .as_u64()
598            .try_into()
599            .expect("initial_stream_window_size must fit in u32");
600
601        let mut tonic_endpoints = Vec::with_capacity(1);
602        #[allow(clippy::single_element_loop)]
603        for endpoint_str in [config.endpoint.clone()] {
604            let endpoints = Endpoint::from_shared(endpoint_str)?
605                .tls_config(ClientTlsConfig::new().with_native_roots())?
606                .initial_connection_window_size(connection_window_size)
607                .initial_stream_window_size(stream_window_size)
608                .http2_adaptive_window(config.enable_http2_adaptive_window);
609            tonic_endpoints.push(endpoints);
610        }
611
612        let connector = FumaroleGrpcConnector {
613            config: config.clone(),
614            endpoints: tonic_endpoints,
615            connect_cnt: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
616        };
617
618        let client = connector.connect().await?;
619        Ok(FumaroleClient {
620            connector,
621            inner: client,
622        })
623    }
624
625    ///
626    /// Returns the current version of the Fumarole service.
627    ///
628    pub async fn version(&mut self) -> Result<proto::VersionResponse, tonic::Status> {
629        let request = tonic::Request::new(proto::VersionRequest {});
630        let response = self.inner.version(request).await?;
631        Ok(response.into_inner())
632    }
633
634    ///
635    /// Subscribe to a stream of updates from the Fumarole service
636    ///
637    pub async fn dragonsmouth_subscribe<S>(
638        &mut self,
639        subscriber_name: S,
640        request: geyser::SubscribeRequest,
641    ) -> Result<DragonsmouthAdapterSession, tonic::Status>
642    where
643        S: AsRef<str>,
644    {
645        let handle = tokio::runtime::Handle::current();
646        self.dragonsmouth_subscribe_with_config_on(
647            subscriber_name,
648            request,
649            Default::default(),
650            handle,
651        )
652        .await
653    }
654
655    pub async fn dragonsmouth_subscribe_with_config<S>(
656        &mut self,
657        consumer_group_name: S,
658        request: geyser::SubscribeRequest,
659        config: FumaroleSubscribeConfig,
660    ) -> Result<DragonsmouthAdapterSession, tonic::Status>
661    where
662        S: AsRef<str>,
663    {
664        let handle = tokio::runtime::Handle::current();
665        self.dragonsmouth_subscribe_with_config_on(consumer_group_name, request, config, handle)
666            .await
667    }
668
669    ///
670    /// Same as [`FumaroleClient::dragonsmouth_subscribe`] but allows you to specify a custom runtime handle
671    /// the underlying fumarole runtie will use
672    ///
673    pub async fn dragonsmouth_subscribe_with_config_on<S>(
674        &mut self,
675        subscriber_name: S,
676        request: geyser::SubscribeRequest,
677        config: FumaroleSubscribeConfig,
678        handle: tokio::runtime::Handle,
679    ) -> Result<DragonsmouthAdapterSession, tonic::Status>
680    where
681        S: AsRef<str>,
682    {
683        let version = self.version().await?;
684        let semver_version = Version::parse(&version.version).ok();
685        if let Some(semver_version) = &semver_version {
686            tracing::debug!("Fumarole service version: {}", semver_version);
687        } else {
688            tracing::warn!(
689                "Failed to parse fumarole service version: {}",
690                version.version
691            );
692        }
693        const SHARDED_DOWNLOAD_MINIMUM_MINOR_VERSION: u64 = 39;
694        let use_sharded_downlaod = config.enable_sharded_block_download
695            && semver_version
696                .filter(|v| v.minor > SHARDED_DOWNLOAD_MINIMUM_MINOR_VERSION)
697                .is_none();
698
699        if config.enable_sharded_block_download && !use_sharded_downlaod {
700            tracing::warn!(
701                "Sharded block download is enabled in the config, but the fumarole service version {} does not support it. Falling back to non-sharded download.",
702                version.version
703            );
704        }
705
706        if use_sharded_downlaod {
707            tracing::debug!("using sharded block download");
708        } else {
709            tracing::debug!("using non-sharded block download");
710        }
711
712        let request = Arc::new(request);
713        assert!(
714            config.num_data_plane_tcp_connections.get() <= MAX_PARA_DATA_STREAMS,
715            "num_data_plane_tcp_connections must be less than or equal to {MAX_PARA_DATA_STREAMS}"
716        );
717
718        assert!(
719            config.refresh_tip_stats_interval >= Duration::from_secs(5),
720            "refresh_tip_stats_interval must be greater than or equal to 5 seconds"
721        );
722
723        use {proto::ControlCommand, runtime::tokio::DragonsmouthSubscribeRequestBidi};
724
725        let (dragonsmouth_outlet, dragonsmouth_inlet) =
726            mpsc::channel(DEFAULT_DRAGONSMOUTH_CAPACITY);
727        let (fume_control_plane_tx, fume_control_plane_rx) = mpsc::channel(100);
728
729        let initial_join = JoinControlPlane {
730            consumer_group_name: Some(subscriber_name.as_ref().to_string()),
731        };
732        let initial_join_command = ControlCommand {
733            command: Some(proto::control_command::Command::InitialJoin(initial_join)),
734        };
735
736        // IMPORTANT: Make sure we send the request here before we subscribe to the stream
737        // Otherwise this will block until timeout by remote server.
738        fume_control_plane_tx
739            .send(initial_join_command)
740            .await
741            .expect("failed to send initial join");
742
743        let resp = if use_sharded_downlaod {
744            self.inner
745                .subscribe_v2(ReceiverStream::new(fume_control_plane_rx))
746                .await?
747        } else {
748            self.inner
749                .subscribe(ReceiverStream::new(fume_control_plane_rx))
750                .await?
751        };
752
753        let mut streaming = resp.into_inner();
754        let fume_control_plane_tx = fume_control_plane_tx.clone();
755        let control_response = streaming.message().await?.expect("none");
756        let fume_control_plane_rx = into_bounded_mpsc_rx(100, streaming);
757        let response = control_response.response.expect("none");
758        let Response::Init(initial_state) = response else {
759            panic!("unexpected initial response: {response:?}")
760        };
761
762        /* WE DON'T SUPPORT SHARDING YET */
763        assert!(
764            initial_state.last_committed_offsets.len() == 1,
765            "sharding not supported"
766        );
767        let last_committed_offset = initial_state
768            .last_committed_offsets
769            .get(&0)
770            .expect("no last committed offset");
771
772        let sm = FumaroleSM::new(*last_committed_offset, config.slot_memory_retention);
773
774        let (dm_tx, dm_rx) = mpsc::channel(config.data_channel_capacity.get());
775        let dm_bidi = DragonsmouthSubscribeRequestBidi {
776            tx: dm_tx.clone(),
777            rx: dm_rx,
778        };
779
780        let mut data_plane_channel_vec =
781            Vec::with_capacity(config.num_data_plane_tcp_connections.get() as usize);
782        // TODO: support config.num_data_plane_tcp_connections
783        for _ in 0..config.num_data_plane_tcp_connections.get() {
784            let client = self
785                .connector
786                .connect()
787                .await
788                .expect("failed to connect to fumarole");
789            let conn = DataPlaneConn::new(client);
790            data_plane_channel_vec.push(conn);
791        }
792        let (download_task_runner_cnc_tx, download_task_runner_cnc_rx) = mpsc::channel(10);
793        // Make sure the channel capacity is really low, since the grpc runner already implements its own concurrency control
794        let (download_task_queue_tx, download_task_queue_rx) = mpsc::channel(100);
795        let (download_result_tx, download_result_rx) = mpsc::channel(1000);
796
797        let download_task_runner_jh = if use_sharded_downlaod {
798            let grpc_download_task_runner = runtime::tokio::GrpcShardedDownloadOrchestrator::new(
799                data_plane_channel_vec,
800                self.connector.clone(),
801                download_task_runner_cnc_rx,
802                download_task_queue_rx,
803                download_result_tx,
804                config.max_failed_slot_download_attempt,
805                Arc::clone(&request),
806                config.concurrent_download_limit_per_tcp,
807                dragonsmouth_outlet.clone(),
808            );
809            handle.spawn(grpc_download_task_runner.run())
810        } else {
811            let grpc_download_task_runner = LegacyGrpcDownloadTaskRunner::new(
812                data_plane_channel_vec,
813                self.connector.clone(),
814                download_task_runner_cnc_rx,
815                download_task_queue_rx,
816                download_result_tx,
817                config.max_failed_slot_download_attempt,
818                Arc::clone(&request),
819                config.concurrent_download_limit_per_tcp.get()
820                    * config.num_data_plane_tcp_connections.get() as usize,
821                dragonsmouth_outlet.clone(),
822            );
823
824            handle.spawn(grpc_download_task_runner.run())
825        };
826
827        let download_task_runner_chans = DownloadTaskRunnerChannels {
828            download_task_queue_tx,
829            cnc_tx: download_task_runner_cnc_tx,
830            download_result_rx,
831        };
832
833        let tokio_rt = TokioFumeDragonsmouthRuntime {
834            sm,
835            fumarole_client: self.clone(),
836            blockchain_id: initial_state.blockchain_id,
837            dragonsmouth_bidi: dm_bidi,
838            subscribe_request: request,
839            download_task_runner_chans,
840            persistent_subscriber_name: subscriber_name.as_ref().to_string(),
841            control_plane_tx: fume_control_plane_tx,
842            control_plane_rx: fume_control_plane_rx,
843            dragonsmouth_outlet,
844            commit_interval: config.commit_interval,
845            last_commit: Instant::now(),
846            get_tip_interval: config.refresh_tip_stats_interval,
847            last_tip: Instant::now(),
848            gc_interval: config.gc_interval,
849            non_critical_background_jobs: Default::default(),
850            last_history_poll: Default::default(),
851            no_commit: config.no_commit,
852            stop: false,
853            enable_sharded_block_download: use_sharded_downlaod,
854        };
855        let fumarole_rt_jh = handle.spawn(tokio_rt.run());
856        let fut = async move {
857            let either = select(download_task_runner_jh, fumarole_rt_jh).await;
858            match either {
859                Either::Left((result, _)) => {
860                    let _ = result.expect("fumarole download task runner failed");
861                }
862                Either::Right((result, _)) => {
863                    let _ = result.expect("fumarole runtime failed");
864                }
865            }
866        };
867        let fumarole_handle = handle.spawn(fut);
868        let dm_session = DragonsmouthAdapterSession {
869            sink: dm_tx,
870            source: dragonsmouth_inlet,
871            fumarole_handle,
872        };
873        Ok(dm_session)
874    }
875
876    pub async fn list_consumer_groups(
877        &mut self,
878        request: impl tonic::IntoRequest<proto::ListConsumerGroupsRequest>,
879    ) -> std::result::Result<tonic::Response<proto::ListConsumerGroupsResponse>, tonic::Status>
880    {
881        tracing::trace!("list_consumer_groups called");
882        self.inner.list_consumer_groups(request).await
883    }
884
885    pub async fn get_consumer_group_info(
886        &mut self,
887        request: impl tonic::IntoRequest<proto::GetConsumerGroupInfoRequest>,
888    ) -> std::result::Result<tonic::Response<proto::ConsumerGroupInfo>, tonic::Status> {
889        tracing::trace!("get_consumer_group_info called");
890        self.inner.get_consumer_group_info(request).await
891    }
892
893    pub async fn delete_consumer_group(
894        &mut self,
895        request: impl tonic::IntoRequest<proto::DeleteConsumerGroupRequest>,
896    ) -> std::result::Result<tonic::Response<proto::DeleteConsumerGroupResponse>, tonic::Status>
897    {
898        tracing::trace!("delete_consumer_group called");
899        self.inner.delete_consumer_group(request).await
900    }
901
902    pub async fn create_consumer_group(
903        &mut self,
904        request: impl tonic::IntoRequest<proto::CreateConsumerGroupRequest>,
905    ) -> std::result::Result<tonic::Response<proto::CreateConsumerGroupResponse>, tonic::Status>
906    {
907        tracing::trace!("create_consumer_group called");
908        self.inner.create_consumer_group(request).await
909    }
910
911    pub async fn get_chain_tip(
912        &mut self,
913        request: impl tonic::IntoRequest<proto::GetChainTipRequest>,
914    ) -> std::result::Result<tonic::Response<proto::GetChainTipResponse>, tonic::Status> {
915        tracing::trace!("get_chain_tip called");
916        self.inner.get_chain_tip(request).await
917    }
918
919    pub async fn get_slot_range(
920        &mut self,
921    ) -> std::result::Result<tonic::Response<proto::GetSlotRangeResponse>, tonic::Status> {
922        tracing::trace!("get_slot_range called");
923        self.inner
924            .get_slot_range(GetSlotRangeRequest {
925                blockchain_id: Uuid::nil().as_bytes().to_vec(),
926            })
927            .await
928    }
929}