use std::time::Duration;
use async_trait::async_trait;
use sof_support::time_support::nonzero_duration_or;
use tonic::{
Request, Status,
client::Grpc,
codegen::http::uri::PathAndQuery,
transport::{Channel, ClientTlsConfig, Endpoint},
};
use tonic_prost::ProstCodec;
use super::{
JitoBlockEngineEndpoint, JitoSubmitConfig, JitoSubmitResponse, JitoSubmitTransport,
JitoTransportConfig, SubmitTransportError,
};
const DEFAULT_JITO_GRPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, PartialEq, ::prost::Message)]
struct SharedHeader {}
#[derive(Clone, PartialEq, ::prost::Message)]
struct PacketFlags {}
#[derive(Clone, PartialEq, ::prost::Message)]
struct PacketMeta {
#[prost(uint64, tag = "1")]
size: u64,
#[prost(string, tag = "2")]
addr: String,
#[prost(uint32, tag = "3")]
port: u32,
#[prost(message, optional, tag = "4")]
flags: Option<PacketFlags>,
#[prost(uint64, tag = "5")]
sender_stake: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
struct Packet {
#[prost(bytes = "vec", tag = "1")]
data: Vec<u8>,
#[prost(message, optional, tag = "2")]
meta: Option<PacketMeta>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
struct Bundle {
#[prost(message, optional, tag = "2")]
header: Option<SharedHeader>,
#[prost(message, repeated, tag = "3")]
packets: Vec<Packet>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
struct SendBundleRequest {
#[prost(message, optional, tag = "1")]
bundle: Option<Bundle>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
struct SendBundleResponse {
#[prost(string, tag = "1")]
uuid: String,
}
#[derive(Debug, Clone)]
pub struct JitoGrpcTransport {
channel: Channel,
}
impl JitoGrpcTransport {
pub fn new() -> Result<Self, SubmitTransportError> {
Self::with_config(JitoTransportConfig::default())
}
pub fn with_config(config: JitoTransportConfig) -> Result<Self, SubmitTransportError> {
let JitoTransportConfig {
endpoint: block_engine_endpoint,
request_timeout,
} = config;
let request_timeout =
nonzero_duration_or(request_timeout, DEFAULT_JITO_GRPC_REQUEST_TIMEOUT);
let endpoint_url = block_engine_endpoint.as_url().to_owned();
let mut transport_endpoint =
Endpoint::from_shared(endpoint_url.clone()).map_err(|error| {
SubmitTransportError::Config {
message: error.to_string(),
}
})?;
transport_endpoint = transport_endpoint
.connect_timeout(request_timeout)
.timeout(request_timeout)
.tcp_nodelay(true);
if endpoint_url.starts_with("https://") {
transport_endpoint = transport_endpoint
.tls_config(ClientTlsConfig::new().with_webpki_roots())
.map_err(|error| SubmitTransportError::Config {
message: error.to_string(),
})?;
}
Ok(Self {
channel: transport_endpoint.connect_lazy(),
})
}
pub fn with_endpoint(endpoint: JitoBlockEngineEndpoint) -> Result<Self, SubmitTransportError> {
Self::with_config(JitoTransportConfig {
endpoint,
..JitoTransportConfig::default()
})
}
fn packet_from_tx(tx_bytes: &[u8]) -> Packet {
Packet {
data: tx_bytes.to_vec(),
meta: Some(PacketMeta {
size: tx_bytes.len() as u64,
addr: String::new(),
port: 0,
flags: None,
sender_stake: 0,
}),
}
}
fn send_bundle_request(tx_bytes: &[u8]) -> SendBundleRequest {
SendBundleRequest {
bundle: Some(Bundle {
header: None,
packets: vec![Self::packet_from_tx(tx_bytes)],
}),
}
}
async fn send_bundle(&self, request: SendBundleRequest) -> Result<SendBundleResponse, Status> {
let mut grpc = Grpc::new(self.channel.clone());
grpc.ready()
.await
.map_err(|error| Status::unavailable(error.to_string()))?;
let response = grpc
.unary(
Request::new(request),
PathAndQuery::from_static("/searcher.SearcherService/SendBundle"),
ProstCodec::<SendBundleRequest, SendBundleResponse>::default(),
)
.await?;
Ok(response.into_inner())
}
}
#[async_trait]
impl JitoSubmitTransport for JitoGrpcTransport {
async fn submit_jito(
&self,
tx_bytes: &[u8],
_config: &JitoSubmitConfig,
) -> Result<JitoSubmitResponse, SubmitTransportError> {
let response = self.send_bundle(Self::send_bundle_request(tx_bytes)).await;
let response = response.map_err(|error| SubmitTransportError::Failure {
message: error.to_string(),
})?;
Ok(JitoSubmitResponse {
transaction_signature: None,
bundle_id: Some(response.uuid),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn packet_from_tx_uses_wire_length() {
let packet = JitoGrpcTransport::packet_from_tx(&[1, 2, 3, 4]);
assert_eq!(packet.data, vec![1, 2, 3, 4]);
assert_eq!(packet.meta.as_ref().map(|meta| meta.size), Some(4));
}
#[test]
fn send_bundle_request_wraps_single_packet() {
let request = JitoGrpcTransport::send_bundle_request(&[9, 8, 7]);
let packet_count = request
.bundle
.as_ref()
.map(|bundle| bundle.packets.len())
.unwrap_or_default();
assert_eq!(packet_count, 1);
}
#[tokio::test(flavor = "current_thread")]
async fn jito_grpc_transport_accepts_zero_timeout_config() {
let transport = JitoGrpcTransport::with_config(JitoTransportConfig {
endpoint: JitoBlockEngineEndpoint::default(),
request_timeout: Duration::ZERO,
});
assert!(transport.is_ok());
}
}