titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use crate::connector::BaseConnector;
use crate::connector::features::grpc::client::{GrpcChannelSpec, GrpcClient};
use crate::connector::features::shared::clients_map::{ClientConfig, ClientsMap, SpecificClient};
use crate::utils::{CancelToken, CoreStats};

use serde::{Deserialize, Serialize};
use std::{fmt::Display, sync::Arc};
use tokio::runtime::{Builder, Runtime};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GrpcConnectorConfig {
    pub default_max_cores: Option<usize>,
    pub specific_core_ids: Vec<usize>,
    pub use_core_stats: bool,
    pub client: ClientConfig<GrpcChannelSpec>,
}

pub struct GrpcConnector {
    config: GrpcConnectorConfig,
    clients_map: ClientsMap<GrpcClient, GrpcChannelSpec>,
    cancel_token: CancelToken,
    core_stats: Option<Arc<CoreStats>>,
    rt_tokio: Arc<tokio::runtime::Runtime>,
}

impl GrpcConnector {
    pub fn clients_map(&self) -> ClientsMap<GrpcClient, GrpcChannelSpec> {
        self.clients_map.clone()
    }

    pub fn upsert_client(
        &self,
        client: SpecificClient<GrpcChannelSpec>,
    ) -> anyhow::Result<Arc<GrpcClient>> {
        self.clients_map.upsert(client)
    }

    pub fn remove_client(&self, id: usize) -> Option<Arc<GrpcClient>> {
        self.clients_map.remove(id)
    }

    pub fn next_client_id(&self) -> usize {
        self.clients_map.next_vacant_id()
    }

    pub fn rt_tokio(&self) -> Arc<Runtime> {
        self.rt_tokio.clone()
    }
}

impl BaseConnector for GrpcConnector {
    type MainConfig = GrpcConnectorConfig;
    ///  создаются gRPC-каналы (tonic::transport::Channel) через create_channel().
    fn init(
        config: Self::MainConfig,
        cancel_token: CancelToken,
        reserved_core_ids: Option<Vec<usize>>,
    ) -> anyhow::Result<Self> {
        let core_stats = if config.use_core_stats {
            Some(CoreStats::new(
                config.default_max_cores,
                config.specific_core_ids.clone(),
                reserved_core_ids.unwrap_or_default(),
            )?)
        } else {
            None
        };

        let rt_tokio = Arc::new(
            Builder::new_current_thread()
                .enable_io()
                .enable_time()
                .build()
                .map_err(|e| anyhow::anyhow!("Tokio Runtime error: {e}"))?,
        );

        let clients_map = ClientsMap::new(&config.client, Some(rt_tokio.clone()))?;

        Ok(Self {
            config,
            clients_map,
            cancel_token,
            core_stats,
            rt_tokio,
        })
    }

    fn name(&self) -> impl AsRef<str> + Display {
        "tonic"
    }
    fn config(&self) -> &Self::MainConfig {
        &self.config
    }
    fn cancel_token(&self) -> &CancelToken {
        &self.cancel_token
    }
    fn cores_stats(&self) -> Option<Arc<CoreStats>> {
        self.core_stats.clone()
    }
}

impl Display for GrpcConnector {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "TonicConnector")
    }
}

#[cfg(test)]
mod tests {

    use tracing_appender::non_blocking::WorkerGuard;

    use crate::{
        connector::{
            BaseConnector, EventTxType, HookArgs,
            features::{
                grpc::{
                    client::GrpcChannelSpec,
                    connector::{GrpcConnector, GrpcConnectorConfig},
                    grpcbin::{
                        DummyMessage, SubscribeRequest, SubscribeRequestFilterBlocks,
                        SubscribeRequestFilterSlots, dummy_message,
                    },
                    stream::{
                        GrpcDescriptor, GrpcEvent, GrpcMethod, GrpcStreamCommand, GrpcStreamMode,
                        GrpcUnaryCall,
                    },
                },
                shared::{clients_map::SpecificClient, events::StreamEventRaw},
            },
        },
        io::ringbuffer::RingSender,
        utils::{CancelToken, NullReducer, NullState, logger::LoggerConfig},
    };

    fn logging_init(level: &str) -> Option<WorkerGuard> {
        let mut logger = LoggerConfig::default();
        logger.level = level.to_string();
        logger.init().unwrap()
    }

    fn tonic_conn_init(uri: &str, tls: bool, conn_id: usize) -> GrpcConnector {
        let cancel = CancelToken::new_root();
        GrpcConnector::init(
            GrpcConnectorConfig {
                default_max_cores: Some(4),
                specific_core_ids: vec![],
                use_core_stats: true,
                client: crate::connector::features::shared::clients_map::ClientConfig {
                    default: None,
                    specific: vec![SpecificClient {
                        id: conn_id,
                        ip: None,
                        spec: GrpcChannelSpec {
                            uri: uri.to_string(),
                            connect_timeout_ms: Some(10000),
                            request_timeout_ms: Some(10000),
                            tcp_nodelay: Some(true),
                            http2_keepalive_interval_ms: Some(10000),
                            http2_keepalive_timeout_ms: Some(10000),
                            tls: tls,
                        },
                    }],
                    fail_on_empty: false,
                },
            },
            cancel,
            None,
        )
        .unwrap()
    }

    #[test]
    fn test_grpc_streaming() {
        let _logger = logging_init("debug");

        let conn_id = 0;

        let mut grpc_conn = tonic_conn_init(
            "https://yellowstone-solana-mainnet.core.chainstack.com",
            true,
            conn_id,
        );

        let streaming_descriptor = GrpcDescriptor::<()>::high_throughput();

        let mut geyser_stream = grpc_conn
            .spawn_stream(
                streaming_descriptor,
                EventTxType::Own,
                streaming_geyser_hook,
            )
            .unwrap();

        let mut req = SubscribeRequest::default();

        req.slots.insert(
            "slots-main".into(),
            SubscribeRequestFilterSlots {
                filter_by_commitment: Some(true),
                interslot_updates: Some(false),
            },
        );

        let connection = GrpcStreamCommand::connect(
            GrpcMethod::Full {
                pkg: "geyser",
                service: "Geyser",
                method: "Subscribe",
            },
            GrpcStreamMode::Bidi,
        )
        .subscription(req)
        .header_kv("x-token", "--token-value--")
        .build()
        .to_builder()
        .conn_id(conn_id)
        .build();

        geyser_stream.try_send(connection).unwrap();

        std::thread::sleep(std::time::Duration::from_secs(5));

        let mut req = SubscribeRequest::default();

        req.blocks.insert(
            "blocks".into(),
            SubscribeRequestFilterBlocks {
                account_include: vec!["6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P".to_string()],
                include_transactions: Some(true),
                include_accounts: Some(false),
                include_entries: Some(false),
            },
        );

        geyser_stream
            .try_send(
                GrpcStreamCommand::send(req)
                    .to_builder()
                    .conn_id(conn_id)
                    .build(),
            )
            .unwrap();

        std::thread::sleep(std::time::Duration::from_secs(5));

        geyser_stream.cancel();
    }

    pub fn streaming_geyser_hook(
        args: HookArgs<
            StreamEventRaw<GrpcEvent>,
            RingSender<()>,
            NullReducer,
            NullState,
            GrpcDescriptor<()>,
            (),
        >,
    ) {
        tracing::debug!("Geyser hook: event={:?}", args.raw);
    }

    #[test]
    fn test_unary_stream() {
        let _logger = logging_init("debug");
        let conn_id = 0;
        let mut tonic_conn = tonic_conn_init("https://grpcb.in:9001", true, conn_id);

        let mut jito_unary_descriptor = GrpcDescriptor::<()>::high_throughput();
        jito_unary_descriptor.max_decoding_message_size = Some(10 * 1024 * 1024);
        jito_unary_descriptor.max_encoding_message_size = Some(10 * 1024 * 1024);

        let mut grpc_unary_stream = tonic_conn
            .spawn_stream(jito_unary_descriptor, EventTxType::Own, test_unary_hook)
            .expect("spawn grpc stream failed");

        let msg = DummyMessage {
            f_string: "test".to_string(),
            f_strings: vec!["test1".to_string(), "test2".to_string()],
            f_int32: 42,
            f_int32s: vec![1, 2, 3],
            f_enum: dummy_message::Enum::Enum1 as i32,
            f_enums: vec![dummy_message::Enum::Enum2 as i32],
            f_sub: Some(dummy_message::Sub {
                f_string: "sub_test".to_string(),
            }),
            f_subs: vec![dummy_message::Sub {
                f_string: "sub_test1".to_string(),
            }],
            f_bool: true,
            f_bools: vec![true, false, true],
            f_int64: 123456789,
            f_int64s: vec![987654321, 123456789],
            f_bytes: vec![1, 2, 3, 4, 5],
            f_bytess: vec![vec![10, 20, 30], vec![40, 50, 60]],
            f_float: 0.5,
            f_floats: vec![1.1, 2.2, 3.3],
        };

        let action = GrpcUnaryCall::new(
            GrpcMethod::Full {
                pkg: "grpcbin",
                service: "GRPCBin",
                method: "DummyUnary",
            },
            msg,
        )
        .to_builder()
        .conn_id(conn_id)
        .build();

        match grpc_unary_stream.try_send(action) {
            Ok(_) => {
                tracing::info!("Sent unary action");
            }
            Err(e) => tracing::info!("Failed to send unary action: {:?}", e),
        }

        std::thread::sleep(std::time::Duration::from_secs(5));

        grpc_unary_stream.cancel();
    }

    pub fn test_unary_hook(
        args: HookArgs<
            StreamEventRaw<GrpcEvent>,
            RingSender<()>,
            NullReducer,
            NullState,
            GrpcDescriptor<()>,
            (),
        >,
    ) {
        if let Ok(m) = args.raw.inner().decode_as::<DummyMessage>() {
            tracing::info!("Received message: {:?}", m);
        } else {
            tracing::info!("Failed to decode message");
        }
    }
}