use {
quinn::{
ClientConfig as QuinnClientConfig, ConnectError, Connection, ConnectionError, Endpoint,
IdleTimeout, TransportConfig, VarInt, WriteError, crypto::rustls::QuicClientConfig,
},
rand::Rng,
rcgen::{CertificateParams, CustomExtension, DistinguishedName, DnType, KeyPair},
rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
rustls::{
DigitallySignedStruct, SignatureScheme,
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
pki_types::{ServerName, UnixTime},
},
std::{
net::{SocketAddr, ToSocketAddrs},
sync::{
Arc,
atomic::{AtomicU8, AtomicU64, Ordering},
},
time::Duration,
},
thiserror::Error,
tokio::{sync::Mutex, task::JoinHandle, time::timeout},
tracing::{info, warn},
};
pub const LUNAR_LANDER_TPU_PROTOCOL_ID: &[u8] = b"lunar-lander-tpu";
const OID_MEV_PROTECT: &[u64] = &[2, 999, 1, 1];
pub const DEFAULT_PORT: u16 = 16_888;
pub const MAX_WIRE_TX_BYTES: usize = 1232;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ClientOptions {
pub connect_timeout: Duration,
pub keepalive_interval: Duration,
pub idle_timeout: Duration,
pub mev_protect: bool,
pub auto_reconnect: bool,
pub proactive_reconnect: bool,
pub reconnect_initial_backoff: Duration,
pub reconnect_max_backoff: Duration,
}
impl Default for ClientOptions {
fn default() -> Self {
Self {
connect_timeout: Duration::from_secs(5),
keepalive_interval: Duration::from_secs(2),
idle_timeout: Duration::from_secs(6),
mev_protect: false,
auto_reconnect: true,
proactive_reconnect: true,
reconnect_initial_backoff: Duration::from_millis(250),
reconnect_max_backoff: Duration::from_secs(30),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ConnectionHealth {
Healthy,
Reconnecting,
Disconnected,
}
const HEALTH_HEALTHY: u8 = 0;
const HEALTH_RECONNECTING: u8 = 1;
const HEALTH_DISCONNECTED: u8 = 2;
#[derive(Debug, Error)]
pub enum ClientError {
#[error("api key must not be empty")]
EmptyApiKey,
#[error(
"keepalive_interval ({keepalive:?}) must be strictly less than idle_timeout ({idle:?}); \
otherwise the QUIC connection idles out before the next keepalive can refresh it"
)]
InvalidTransport { keepalive: Duration, idle: Duration },
#[error("endpoint `{0}` must be host:port")]
InvalidEndpoint(String),
#[error("failed to resolve endpoint `{endpoint}`: {source}")]
ResolveEndpoint {
endpoint: String,
#[source]
source: std::io::Error,
},
#[error("endpoint `{0}` resolved to no socket addresses")]
NoResolvedAddress(String),
#[error("failed to generate client certificate: {0}")]
ClientCertificate(String),
#[error("failed to build QUIC client config: {0}")]
ClientConfig(String),
#[error("failed to bind local QUIC client endpoint: {0}")]
ClientBind(#[source] std::io::Error),
#[error("failed to start QUIC connect: {0}")]
ConnectStart(#[from] ConnectError),
#[error("timed out connecting after {0:?}")]
ConnectTimeout(Duration),
#[error("failed to establish QUIC connection: {0}")]
Connect(String),
#[error("failed to open uni stream: {0}")]
OpenUni(String),
#[error("failed to write transaction payload: {0}")]
Write(#[from] WriteError),
#[error("failed to finish uni stream: {0}")]
Finish(String),
}
pub type Result<T> = std::result::Result<T, ClientError>;
#[derive(Debug)]
pub struct LunarLanderQuicClient {
inner: Arc<ClientInner>,
watchdog: Option<JoinHandle<()>>,
}
#[derive(Debug)]
struct ClientInner {
endpoint_label: String,
server_addr: SocketAddr,
server_name: String,
options: ClientOptions,
endpoint: Endpoint,
connection: Mutex<Connection>,
reconnects_total: AtomicU64,
health: AtomicU8,
}
impl LunarLanderQuicClient {
pub async fn connect(endpoint: impl Into<String>, api_key: impl Into<String>) -> Result<Self> {
Self::connect_with_options(endpoint, api_key, ClientOptions::default()).await
}
pub async fn connect_with_options(
endpoint: impl Into<String>,
api_key: impl Into<String>,
options: ClientOptions,
) -> Result<Self> {
install_rustls_provider();
let endpoint_label = endpoint.into();
let api_key = api_key.into();
if api_key.trim().is_empty() {
return Err(ClientError::EmptyApiKey);
}
let (server_addr, server_name) = resolve_endpoint(&endpoint_label)?;
let endpoint_socket = Endpoint::client("0.0.0.0:0".parse().expect("valid client bind"))
.map_err(ClientError::ClientBind)?;
let client_config = build_client_config(&api_key, &options)?;
let mut endpoint = endpoint_socket;
endpoint.set_default_client_config(client_config);
let connection = connect_inner(
&endpoint,
server_addr,
&server_name,
options.connect_timeout,
)
.await?;
let proactive_reconnect = options.proactive_reconnect;
let inner = Arc::new(ClientInner {
endpoint_label,
server_addr,
server_name,
options,
endpoint,
connection: Mutex::new(connection),
reconnects_total: AtomicU64::new(0),
health: AtomicU8::new(HEALTH_HEALTHY),
});
let watchdog = if proactive_reconnect {
Some(tokio::spawn(watchdog_loop(Arc::clone(&inner))))
} else {
None
};
Ok(Self { inner, watchdog })
}
pub fn endpoint(&self) -> &str {
&self.inner.endpoint_label
}
pub fn remote_addr(&self) -> SocketAddr {
self.inner.server_addr
}
pub fn server_name(&self) -> &str {
&self.inner.server_name
}
pub fn reconnects_total(&self) -> u64 {
self.inner.reconnects_total.load(Ordering::Relaxed)
}
pub fn health(&self) -> ConnectionHealth {
match self.inner.health.load(Ordering::Acquire) {
HEALTH_HEALTHY => ConnectionHealth::Healthy,
HEALTH_RECONNECTING => ConnectionHealth::Reconnecting,
_ => ConnectionHealth::Disconnected,
}
}
pub async fn reconnect(&mut self) -> Result<()> {
let mut guard = self.inner.connection.lock().await;
let old = guard.clone();
old.close(VarInt::from_u32(0), b"manual_reconnect");
self.inner
.health
.store(HEALTH_RECONNECTING, Ordering::Release);
let fresh = match connect_inner(
&self.inner.endpoint,
self.inner.server_addr,
&self.inner.server_name,
self.inner.options.connect_timeout,
)
.await
{
Ok(connection) => connection,
Err(error) => {
self.inner
.health
.store(HEALTH_DISCONNECTED, Ordering::Release);
return Err(error);
}
};
*guard = fresh;
self.inner.reconnects_total.fetch_add(1, Ordering::Relaxed);
self.inner.health.store(HEALTH_HEALTHY, Ordering::Release);
Ok(())
}
pub async fn send_transaction(&self, payload: &[u8]) -> Result<()> {
let connection = { self.inner.connection.lock().await.clone() };
match send_on(&connection, payload).await {
Ok(()) => Ok(()),
Err(error) => {
let Some(close_reason) = connection.close_reason() else {
return Err(error);
};
if !self.inner.options.auto_reconnect {
if !self.inner.options.proactive_reconnect {
self.inner
.health
.store(HEALTH_DISCONNECTED, Ordering::Release);
}
return Err(error);
}
let new_connection = self
.inner
.reconnect_if_same(&connection, &close_reason)
.await?;
send_on(&new_connection, payload).await
}
}
}
pub async fn close(mut self) {
if let Some(watchdog) = self.watchdog.take() {
watchdog.abort();
}
{
let connection = self.inner.connection.lock().await;
connection.close(0u32.into(), b"client_closed");
}
self.inner.endpoint.close(0u32.into(), b"client_closed");
let _ = self.inner.endpoint.wait_idle().await;
}
}
impl Drop for LunarLanderQuicClient {
fn drop(&mut self) {
if let Some(watchdog) = self.watchdog.take() {
watchdog.abort();
}
}
}
impl ClientInner {
async fn reconnect_if_same(
self: &Arc<Self>,
dead: &Connection,
close_reason: &ConnectionError,
) -> Result<Connection> {
let mut guard = self.connection.lock().await;
if guard.stable_id() == dead.stable_id() {
warn!(
server = %self.endpoint_label,
close_reason = %close_reason,
"lunar-lander QUIC connection closed; reconnecting"
);
self.health.store(HEALTH_RECONNECTING, Ordering::Release);
let fresh = match connect_inner(
&self.endpoint,
self.server_addr,
&self.server_name,
self.options.connect_timeout,
)
.await
{
Ok(connection) => connection,
Err(error) => {
if !self.options.proactive_reconnect {
self.health.store(HEALTH_DISCONNECTED, Ordering::Release);
}
return Err(error);
}
};
info!(
server = %self.endpoint_label,
"lunar-lander QUIC connection re-established"
);
*guard = fresh.clone();
self.reconnects_total.fetch_add(1, Ordering::Relaxed);
self.health.store(HEALTH_HEALTHY, Ordering::Release);
Ok(fresh)
} else {
Ok(guard.clone())
}
}
}
fn jittered(base: Duration) -> Duration {
let nanos = base.as_nanos();
if nanos == 0 {
return Duration::ZERO;
}
let bound = u64::try_from(nanos).unwrap_or(u64::MAX);
let pick = rand::rng().random_range(0..bound);
Duration::from_nanos(pick)
}
async fn watchdog_loop(inner: Arc<ClientInner>) {
loop {
let connection = { inner.connection.lock().await.clone() };
let close_reason = connection.closed().await;
warn!(
server = %inner.endpoint_label,
close_reason = %close_reason,
"lunar-lander QUIC watchdog observed connection close; reconnecting"
);
inner.health.store(HEALTH_RECONNECTING, Ordering::Release);
let mut next_backoff = inner.options.reconnect_initial_backoff;
loop {
{
let guard = inner.connection.lock().await;
if guard.stable_id() != connection.stable_id() {
inner.health.store(HEALTH_HEALTHY, Ordering::Release);
break;
}
}
match connect_inner(
&inner.endpoint,
inner.server_addr,
&inner.server_name,
inner.options.connect_timeout,
)
.await
{
Ok(fresh) => {
let mut guard = inner.connection.lock().await;
if guard.stable_id() == connection.stable_id() {
*guard = fresh;
inner.reconnects_total.fetch_add(1, Ordering::Relaxed);
info!(
server = %inner.endpoint_label,
"lunar-lander QUIC watchdog re-established connection"
);
}
inner.health.store(HEALTH_HEALTHY, Ordering::Release);
break;
}
Err(error) => {
let sleep_for = jittered(next_backoff);
warn!(
server = %inner.endpoint_label,
error = %error,
backoff_ms = sleep_for.as_millis() as u64,
"lunar-lander QUIC watchdog reconnect attempt failed; retrying"
);
tokio::time::sleep(sleep_for).await;
next_backoff = (next_backoff * 2).min(inner.options.reconnect_max_backoff);
}
}
}
}
}
async fn send_on(connection: &Connection, payload: &[u8]) -> Result<()> {
let mut stream = connection
.open_uni()
.await
.map_err(|error| ClientError::OpenUni(error.to_string()))?;
stream.write_all(payload).await?;
stream
.finish()
.map_err(|error| ClientError::Finish(error.to_string()))?;
Ok(())
}
fn install_rustls_provider() {
if rustls::crypto::CryptoProvider::get_default().is_some() {
return;
}
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
}
fn build_client_config(api_key: &str, options: &ClientOptions) -> Result<QuinnClientConfig> {
if options.keepalive_interval >= options.idle_timeout {
return Err(ClientError::InvalidTransport {
keepalive: options.keepalive_interval,
idle: options.idle_timeout,
});
}
let key_pair =
KeyPair::generate().map_err(|error| ClientError::ClientCertificate(error.to_string()))?;
let mut params = CertificateParams::new(Vec::new())
.map_err(|error| ClientError::ClientCertificate(error.to_string()))?;
let mut distinguished_name = DistinguishedName::new();
distinguished_name.push(DnType::CommonName, api_key);
params.distinguished_name = distinguished_name;
if options.mev_protect {
let ext = CustomExtension::from_oid_content(OID_MEV_PROTECT, vec![0x01, 0x01, 0xFF]);
params.custom_extensions.push(ext);
}
let certificate = params
.self_signed(&key_pair)
.map_err(|error| ClientError::ClientCertificate(error.to_string()))?;
let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der()));
let no_verifier = Arc::new(NoServerCertificateVerification::new());
let mut client_crypto = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(no_verifier)
.with_client_auth_cert(
vec![CertificateDer::from(certificate.der().to_vec())],
private_key,
)
.map_err(|error| ClientError::ClientConfig(error.to_string()))?;
client_crypto.alpn_protocols = vec![LUNAR_LANDER_TPU_PROTOCOL_ID.to_vec()];
let mut transport = TransportConfig::default();
transport.keep_alive_interval(Some(options.keepalive_interval));
transport.max_idle_timeout(Some(
IdleTimeout::try_from(options.idle_timeout)
.map_err(|error| ClientError::ClientConfig(error.to_string()))?,
));
let mut client_config = QuinnClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto)
.map_err(|error| ClientError::ClientConfig(error.to_string()))?,
));
client_config.transport_config(Arc::new(transport));
Ok(client_config)
}
#[derive(Debug)]
struct NoServerCertificateVerification(Arc<rustls::crypto::CryptoProvider>);
impl NoServerCertificateVerification {
fn new() -> Self {
let provider = rustls::crypto::CryptoProvider::get_default()
.expect("rustls crypto provider should be installed")
.clone();
Self(provider)
}
}
impl ServerCertVerifier for NoServerCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp_response: &[u8],
_now: UnixTime,
) -> std::result::Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
rustls::crypto::verify_tls12_signature(
message,
cert,
dss,
&self.0.signature_verification_algorithms,
)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
rustls::crypto::verify_tls13_signature(
message,
cert,
dss,
&self.0.signature_verification_algorithms,
)
}
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}
}
async fn connect_inner(
endpoint: &Endpoint,
server_addr: SocketAddr,
server_name: &str,
connect_timeout: Duration,
) -> Result<Connection> {
let connecting = endpoint.connect(server_addr, server_name)?;
let connection = timeout(connect_timeout, connecting)
.await
.map_err(|_| ClientError::ConnectTimeout(connect_timeout))?
.map_err(|error: ConnectionError| ClientError::Connect(error.to_string()))?;
Ok(connection)
}
fn resolve_endpoint(endpoint: &str) -> Result<(SocketAddr, String)> {
let endpoint_host = host_from_endpoint(endpoint)?;
let server_addr = endpoint
.to_socket_addrs()
.map_err(|source| ClientError::ResolveEndpoint {
endpoint: endpoint.to_string(),
source,
})?
.next()
.ok_or_else(|| ClientError::NoResolvedAddress(endpoint.to_string()))?;
Ok((server_addr, endpoint_host))
}
fn host_from_endpoint(endpoint: &str) -> Result<String> {
if endpoint.starts_with('[') {
let close = endpoint
.find(']')
.ok_or_else(|| ClientError::InvalidEndpoint(endpoint.to_string()))?;
return Ok(endpoint[1..close].to_string());
}
endpoint
.rsplit_once(':')
.map(|(host, _)| host.to_string())
.ok_or_else(|| ClientError::InvalidEndpoint(endpoint.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_host_from_ipv4_endpoint() {
assert_eq!(
host_from_endpoint("fra.lunar-lander.hellomoon.io:16888").unwrap(),
"fra.lunar-lander.hellomoon.io"
);
}
#[test]
fn parses_host_from_ipv6_endpoint() {
assert_eq!(host_from_endpoint("[::1]:16888").unwrap(), "::1");
}
#[test]
fn default_options_have_mev_protect_disabled() {
let options = ClientOptions::default();
assert!(!options.mev_protect);
}
#[test]
fn default_options_enable_auto_reconnect() {
let options = ClientOptions::default();
assert!(options.auto_reconnect);
}
#[test]
fn default_options_enable_proactive_reconnect() {
let options = ClientOptions::default();
assert!(options.proactive_reconnect);
}
#[test]
fn default_backoff_grows_to_max() {
let options = ClientOptions::default();
assert!(options.reconnect_initial_backoff < options.reconnect_max_backoff);
}
#[test]
fn jittered_returns_zero_for_zero_base() {
assert_eq!(jittered(Duration::ZERO), Duration::ZERO);
}
#[test]
fn jittered_stays_below_base() {
let base = Duration::from_millis(500);
for _ in 0..32 {
assert!(jittered(base) < base);
}
}
#[test]
fn build_client_config_without_mev_protect() {
install_rustls_provider();
let options = ClientOptions::default();
build_client_config("test-api-key", &options).unwrap();
}
#[test]
fn build_client_config_with_mev_protect() {
install_rustls_provider();
let options = ClientOptions {
mev_protect: true,
..ClientOptions::default()
};
build_client_config("test-api-key", &options).unwrap();
}
#[test]
fn build_client_config_rejects_keepalive_at_or_above_idle() {
install_rustls_provider();
let options = ClientOptions {
keepalive_interval: Duration::from_secs(5),
idle_timeout: Duration::from_secs(5),
..ClientOptions::default()
};
assert!(matches!(
build_client_config("test-api-key", &options),
Err(ClientError::InvalidTransport { .. })
));
let options = ClientOptions {
keepalive_interval: Duration::from_secs(10),
idle_timeout: Duration::from_secs(5),
..ClientOptions::default()
};
assert!(matches!(
build_client_config("test-api-key", &options),
Err(ClientError::InvalidTransport { .. })
));
}
}