1use {
76 quinn::{
77 ClientConfig as QuinnClientConfig, ConnectError, Connection, ConnectionError, Endpoint,
78 IdleTimeout, TransportConfig, VarInt, WriteError, crypto::rustls::QuicClientConfig,
79 },
80 rand::Rng,
81 rcgen::{CertificateParams, CustomExtension, DistinguishedName, DnType, KeyPair},
82 rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
83 rustls::{
84 DigitallySignedStruct, SignatureScheme,
85 client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
86 pki_types::{ServerName, UnixTime},
87 },
88 std::{
89 net::{SocketAddr, ToSocketAddrs},
90 sync::{
91 Arc,
92 atomic::{AtomicU8, AtomicU64, Ordering},
93 },
94 time::Duration,
95 },
96 thiserror::Error,
97 tokio::{sync::Mutex, task::JoinHandle, time::timeout},
98 tracing::{info, warn},
99};
100
101pub const LUNAR_LANDER_TPU_PROTOCOL_ID: &[u8] = b"lunar-lander-tpu";
103
104const OID_MEV_PROTECT: &[u64] = &[2, 999, 1, 1];
109pub const DEFAULT_PORT: u16 = 16_888;
111pub const MAX_WIRE_TX_BYTES: usize = 1232;
113
114#[derive(Debug, Clone, Eq, PartialEq)]
116pub struct ClientOptions {
117 pub connect_timeout: Duration,
119 pub keepalive_interval: Duration,
126 pub idle_timeout: Duration,
138 pub mev_protect: bool,
144 pub auto_reconnect: bool,
154 pub proactive_reconnect: bool,
164 pub reconnect_initial_backoff: Duration,
170 pub reconnect_max_backoff: Duration,
174}
175
176impl Default for ClientOptions {
177 fn default() -> Self {
178 Self {
179 connect_timeout: Duration::from_secs(5),
180 keepalive_interval: Duration::from_secs(2),
181 idle_timeout: Duration::from_secs(6),
182 mev_protect: false,
183 auto_reconnect: true,
184 proactive_reconnect: true,
185 reconnect_initial_backoff: Duration::from_millis(250),
186 reconnect_max_backoff: Duration::from_secs(30),
187 }
188 }
189}
190
191#[derive(Debug, Clone, Copy, Eq, PartialEq)]
197pub enum ConnectionHealth {
198 Healthy,
200 Reconnecting,
204 Disconnected,
209}
210
211const HEALTH_HEALTHY: u8 = 0;
212const HEALTH_RECONNECTING: u8 = 1;
213const HEALTH_DISCONNECTED: u8 = 2;
214
215#[derive(Debug, Error)]
217pub enum ClientError {
218 #[error("api key must not be empty")]
219 EmptyApiKey,
220 #[error(
221 "keepalive_interval ({keepalive:?}) must be strictly less than idle_timeout ({idle:?}); \
222 otherwise the QUIC connection idles out before the next keepalive can refresh it"
223 )]
224 InvalidTransport { keepalive: Duration, idle: Duration },
225 #[error("endpoint `{0}` must be host:port")]
226 InvalidEndpoint(String),
227 #[error("failed to resolve endpoint `{endpoint}`: {source}")]
228 ResolveEndpoint {
229 endpoint: String,
230 #[source]
231 source: std::io::Error,
232 },
233 #[error("endpoint `{0}` resolved to no socket addresses")]
234 NoResolvedAddress(String),
235 #[error("failed to generate client certificate: {0}")]
236 ClientCertificate(String),
237 #[error("failed to build QUIC client config: {0}")]
238 ClientConfig(String),
239 #[error("failed to bind local QUIC client endpoint: {0}")]
240 ClientBind(#[source] std::io::Error),
241 #[error("failed to start QUIC connect: {0}")]
242 ConnectStart(#[from] ConnectError),
243 #[error("timed out connecting after {0:?}")]
244 ConnectTimeout(Duration),
245 #[error("failed to establish QUIC connection: {0}")]
246 Connect(String),
247 #[error("failed to open uni stream: {0}")]
248 OpenUni(String),
249 #[error("failed to write transaction payload: {0}")]
250 Write(#[from] WriteError),
251 #[error("failed to finish uni stream: {0}")]
252 Finish(String),
253}
254
255pub type Result<T> = std::result::Result<T, ClientError>;
256
257#[derive(Debug)]
271pub struct LunarLanderQuicClient {
272 inner: Arc<ClientInner>,
273 watchdog: Option<JoinHandle<()>>,
274}
275
276#[derive(Debug)]
277struct ClientInner {
278 endpoint_label: String,
279 server_addr: SocketAddr,
280 server_name: String,
281 options: ClientOptions,
282 endpoint: Endpoint,
283 connection: Mutex<Connection>,
289 reconnects_total: AtomicU64,
294 health: AtomicU8,
299}
300
301impl LunarLanderQuicClient {
302 pub async fn connect(endpoint: impl Into<String>, api_key: impl Into<String>) -> Result<Self> {
307 Self::connect_with_options(endpoint, api_key, ClientOptions::default()).await
308 }
309
310 pub async fn connect_with_options(
315 endpoint: impl Into<String>,
316 api_key: impl Into<String>,
317 options: ClientOptions,
318 ) -> Result<Self> {
319 install_rustls_provider();
320
321 let endpoint_label = endpoint.into();
322 let api_key = api_key.into();
323 if api_key.trim().is_empty() {
324 return Err(ClientError::EmptyApiKey);
325 }
326
327 let (server_addr, server_name) = resolve_endpoint(&endpoint_label)?;
328
329 let endpoint_socket = Endpoint::client("0.0.0.0:0".parse().expect("valid client bind"))
330 .map_err(ClientError::ClientBind)?;
331 let client_config = build_client_config(&api_key, &options)?;
332 let mut endpoint = endpoint_socket;
333 endpoint.set_default_client_config(client_config);
334
335 let connection = connect_inner(
336 &endpoint,
337 server_addr,
338 &server_name,
339 options.connect_timeout,
340 )
341 .await?;
342
343 let proactive_reconnect = options.proactive_reconnect;
344 let inner = Arc::new(ClientInner {
345 endpoint_label,
346 server_addr,
347 server_name,
348 options,
349 endpoint,
350 connection: Mutex::new(connection),
351 reconnects_total: AtomicU64::new(0),
352 health: AtomicU8::new(HEALTH_HEALTHY),
353 });
354
355 let watchdog = if proactive_reconnect {
356 Some(tokio::spawn(watchdog_loop(Arc::clone(&inner))))
357 } else {
358 None
359 };
360
361 Ok(Self { inner, watchdog })
362 }
363
364 pub fn endpoint(&self) -> &str {
367 &self.inner.endpoint_label
368 }
369
370 pub fn remote_addr(&self) -> SocketAddr {
372 self.inner.server_addr
373 }
374
375 pub fn server_name(&self) -> &str {
377 &self.inner.server_name
378 }
379
380 pub fn reconnects_total(&self) -> u64 {
384 self.inner.reconnects_total.load(Ordering::Relaxed)
385 }
386
387 pub fn health(&self) -> ConnectionHealth {
390 match self.inner.health.load(Ordering::Acquire) {
391 HEALTH_HEALTHY => ConnectionHealth::Healthy,
392 HEALTH_RECONNECTING => ConnectionHealth::Reconnecting,
393 _ => ConnectionHealth::Disconnected,
394 }
395 }
396
397 pub async fn reconnect(&mut self) -> Result<()> {
411 let mut guard = self.inner.connection.lock().await;
417 let old = guard.clone();
418 old.close(VarInt::from_u32(0), b"manual_reconnect");
419
420 self.inner
421 .health
422 .store(HEALTH_RECONNECTING, Ordering::Release);
423 let fresh = match connect_inner(
424 &self.inner.endpoint,
425 self.inner.server_addr,
426 &self.inner.server_name,
427 self.inner.options.connect_timeout,
428 )
429 .await
430 {
431 Ok(connection) => connection,
432 Err(error) => {
433 self.inner
434 .health
435 .store(HEALTH_DISCONNECTED, Ordering::Release);
436 return Err(error);
437 }
438 };
439
440 *guard = fresh;
441 self.inner.reconnects_total.fetch_add(1, Ordering::Relaxed);
442 self.inner.health.store(HEALTH_HEALTHY, Ordering::Release);
443 Ok(())
444 }
445
446 pub async fn send_transaction(&self, payload: &[u8]) -> Result<()> {
458 let connection = { self.inner.connection.lock().await.clone() };
459 match send_on(&connection, payload).await {
460 Ok(()) => Ok(()),
461 Err(error) => {
462 let Some(close_reason) = connection.close_reason() else {
463 return Err(error);
466 };
467 if !self.inner.options.auto_reconnect {
468 if !self.inner.options.proactive_reconnect {
472 self.inner
473 .health
474 .store(HEALTH_DISCONNECTED, Ordering::Release);
475 }
476 return Err(error);
477 }
478 let new_connection = self
479 .inner
480 .reconnect_if_same(&connection, &close_reason)
481 .await?;
482 send_on(&new_connection, payload).await
483 }
484 }
485 }
486
487 pub async fn close(mut self) {
489 if let Some(watchdog) = self.watchdog.take() {
490 watchdog.abort();
491 }
492 {
493 let connection = self.inner.connection.lock().await;
494 connection.close(0u32.into(), b"client_closed");
495 }
496 self.inner.endpoint.close(0u32.into(), b"client_closed");
497 let _ = self.inner.endpoint.wait_idle().await;
498 }
499}
500
501impl Drop for LunarLanderQuicClient {
502 fn drop(&mut self) {
503 if let Some(watchdog) = self.watchdog.take() {
504 watchdog.abort();
505 }
506 }
507}
508
509impl ClientInner {
510 async fn reconnect_if_same(
515 self: &Arc<Self>,
516 dead: &Connection,
517 close_reason: &ConnectionError,
518 ) -> Result<Connection> {
519 let mut guard = self.connection.lock().await;
520 if guard.stable_id() == dead.stable_id() {
521 warn!(
522 server = %self.endpoint_label,
523 close_reason = %close_reason,
524 "lunar-lander QUIC connection closed; reconnecting"
525 );
526 self.health.store(HEALTH_RECONNECTING, Ordering::Release);
527 let fresh = match connect_inner(
528 &self.endpoint,
529 self.server_addr,
530 &self.server_name,
531 self.options.connect_timeout,
532 )
533 .await
534 {
535 Ok(connection) => connection,
536 Err(error) => {
537 if !self.options.proactive_reconnect {
540 self.health.store(HEALTH_DISCONNECTED, Ordering::Release);
541 }
542 return Err(error);
543 }
544 };
545 info!(
546 server = %self.endpoint_label,
547 "lunar-lander QUIC connection re-established"
548 );
549 *guard = fresh.clone();
550 self.reconnects_total.fetch_add(1, Ordering::Relaxed);
551 self.health.store(HEALTH_HEALTHY, Ordering::Release);
552 Ok(fresh)
553 } else {
554 Ok(guard.clone())
555 }
556 }
557}
558
559fn jittered(base: Duration) -> Duration {
563 let nanos = base.as_nanos();
564 if nanos == 0 {
565 return Duration::ZERO;
566 }
567 let bound = u64::try_from(nanos).unwrap_or(u64::MAX);
569 let pick = rand::rng().random_range(0..bound);
570 Duration::from_nanos(pick)
571}
572
573async fn watchdog_loop(inner: Arc<ClientInner>) {
580 loop {
581 let connection = { inner.connection.lock().await.clone() };
582 let close_reason = connection.closed().await;
583 warn!(
584 server = %inner.endpoint_label,
585 close_reason = %close_reason,
586 "lunar-lander QUIC watchdog observed connection close; reconnecting"
587 );
588 inner.health.store(HEALTH_RECONNECTING, Ordering::Release);
589
590 let mut next_backoff = inner.options.reconnect_initial_backoff;
591 loop {
592 {
596 let guard = inner.connection.lock().await;
597 if guard.stable_id() != connection.stable_id() {
598 inner.health.store(HEALTH_HEALTHY, Ordering::Release);
599 break;
600 }
601 }
602
603 match connect_inner(
604 &inner.endpoint,
605 inner.server_addr,
606 &inner.server_name,
607 inner.options.connect_timeout,
608 )
609 .await
610 {
611 Ok(fresh) => {
612 let mut guard = inner.connection.lock().await;
613 if guard.stable_id() == connection.stable_id() {
614 *guard = fresh;
615 inner.reconnects_total.fetch_add(1, Ordering::Relaxed);
616 info!(
617 server = %inner.endpoint_label,
618 "lunar-lander QUIC watchdog re-established connection"
619 );
620 }
621 inner.health.store(HEALTH_HEALTHY, Ordering::Release);
625 break;
626 }
627 Err(error) => {
628 let sleep_for = jittered(next_backoff);
629 warn!(
630 server = %inner.endpoint_label,
631 error = %error,
632 backoff_ms = sleep_for.as_millis() as u64,
633 "lunar-lander QUIC watchdog reconnect attempt failed; retrying"
634 );
635 tokio::time::sleep(sleep_for).await;
636 next_backoff = (next_backoff * 2).min(inner.options.reconnect_max_backoff);
637 }
638 }
639 }
640 }
641}
642
643async fn send_on(connection: &Connection, payload: &[u8]) -> Result<()> {
644 let mut stream = connection
645 .open_uni()
646 .await
647 .map_err(|error| ClientError::OpenUni(error.to_string()))?;
648 stream.write_all(payload).await?;
649 stream
650 .finish()
651 .map_err(|error| ClientError::Finish(error.to_string()))?;
652 Ok(())
653}
654
655fn install_rustls_provider() {
656 if rustls::crypto::CryptoProvider::get_default().is_some() {
657 return;
658 }
659
660 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
661}
662
663fn build_client_config(api_key: &str, options: &ClientOptions) -> Result<QuinnClientConfig> {
664 if options.keepalive_interval >= options.idle_timeout {
669 return Err(ClientError::InvalidTransport {
670 keepalive: options.keepalive_interval,
671 idle: options.idle_timeout,
672 });
673 }
674
675 let key_pair =
676 KeyPair::generate().map_err(|error| ClientError::ClientCertificate(error.to_string()))?;
677 let mut params = CertificateParams::new(Vec::new())
678 .map_err(|error| ClientError::ClientCertificate(error.to_string()))?;
679 let mut distinguished_name = DistinguishedName::new();
680 distinguished_name.push(DnType::CommonName, api_key);
681 params.distinguished_name = distinguished_name;
682
683 if options.mev_protect {
684 let ext = CustomExtension::from_oid_content(OID_MEV_PROTECT, vec![0x01, 0x01, 0xFF]);
686 params.custom_extensions.push(ext);
687 }
688
689 let certificate = params
690 .self_signed(&key_pair)
691 .map_err(|error| ClientError::ClientCertificate(error.to_string()))?;
692
693 let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der()));
694 let no_verifier = Arc::new(NoServerCertificateVerification::new());
695 let mut client_crypto = rustls::ClientConfig::builder()
696 .dangerous()
697 .with_custom_certificate_verifier(no_verifier)
698 .with_client_auth_cert(
699 vec![CertificateDer::from(certificate.der().to_vec())],
700 private_key,
701 )
702 .map_err(|error| ClientError::ClientConfig(error.to_string()))?;
703 client_crypto.alpn_protocols = vec![LUNAR_LANDER_TPU_PROTOCOL_ID.to_vec()];
704
705 let mut transport = TransportConfig::default();
706 transport.keep_alive_interval(Some(options.keepalive_interval));
707 transport.max_idle_timeout(Some(
708 IdleTimeout::try_from(options.idle_timeout)
709 .map_err(|error| ClientError::ClientConfig(error.to_string()))?,
710 ));
711
712 let mut client_config = QuinnClientConfig::new(Arc::new(
713 QuicClientConfig::try_from(client_crypto)
714 .map_err(|error| ClientError::ClientConfig(error.to_string()))?,
715 ));
716 client_config.transport_config(Arc::new(transport));
717 Ok(client_config)
718}
719
720#[derive(Debug)]
721struct NoServerCertificateVerification(Arc<rustls::crypto::CryptoProvider>);
722
723impl NoServerCertificateVerification {
724 fn new() -> Self {
725 let provider = rustls::crypto::CryptoProvider::get_default()
726 .expect("rustls crypto provider should be installed")
727 .clone();
728 Self(provider)
729 }
730}
731
732impl ServerCertVerifier for NoServerCertificateVerification {
733 fn verify_server_cert(
734 &self,
735 _end_entity: &CertificateDer<'_>,
736 _intermediates: &[CertificateDer<'_>],
737 _server_name: &ServerName<'_>,
738 _ocsp_response: &[u8],
739 _now: UnixTime,
740 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
741 Ok(ServerCertVerified::assertion())
742 }
743
744 fn verify_tls12_signature(
745 &self,
746 message: &[u8],
747 cert: &CertificateDer<'_>,
748 dss: &DigitallySignedStruct,
749 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
750 rustls::crypto::verify_tls12_signature(
751 message,
752 cert,
753 dss,
754 &self.0.signature_verification_algorithms,
755 )
756 }
757
758 fn verify_tls13_signature(
759 &self,
760 message: &[u8],
761 cert: &CertificateDer<'_>,
762 dss: &DigitallySignedStruct,
763 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
764 rustls::crypto::verify_tls13_signature(
765 message,
766 cert,
767 dss,
768 &self.0.signature_verification_algorithms,
769 )
770 }
771
772 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
773 self.0.signature_verification_algorithms.supported_schemes()
774 }
775}
776
777async fn connect_inner(
778 endpoint: &Endpoint,
779 server_addr: SocketAddr,
780 server_name: &str,
781 connect_timeout: Duration,
782) -> Result<Connection> {
783 let connecting = endpoint.connect(server_addr, server_name)?;
784 let connection = timeout(connect_timeout, connecting)
785 .await
786 .map_err(|_| ClientError::ConnectTimeout(connect_timeout))?
787 .map_err(|error: ConnectionError| ClientError::Connect(error.to_string()))?;
788 Ok(connection)
789}
790
791fn resolve_endpoint(endpoint: &str) -> Result<(SocketAddr, String)> {
792 let endpoint_host = host_from_endpoint(endpoint)?;
793 let server_addr = endpoint
794 .to_socket_addrs()
795 .map_err(|source| ClientError::ResolveEndpoint {
796 endpoint: endpoint.to_string(),
797 source,
798 })?
799 .next()
800 .ok_or_else(|| ClientError::NoResolvedAddress(endpoint.to_string()))?;
801 Ok((server_addr, endpoint_host))
802}
803
804fn host_from_endpoint(endpoint: &str) -> Result<String> {
805 if endpoint.starts_with('[') {
806 let close = endpoint
807 .find(']')
808 .ok_or_else(|| ClientError::InvalidEndpoint(endpoint.to_string()))?;
809 return Ok(endpoint[1..close].to_string());
810 }
811
812 endpoint
813 .rsplit_once(':')
814 .map(|(host, _)| host.to_string())
815 .ok_or_else(|| ClientError::InvalidEndpoint(endpoint.to_string()))
816}
817
818#[cfg(test)]
819mod tests {
820 use super::*;
821
822 #[test]
823 fn parses_host_from_ipv4_endpoint() {
824 assert_eq!(
825 host_from_endpoint("fra.lunar-lander.hellomoon.io:16888").unwrap(),
826 "fra.lunar-lander.hellomoon.io"
827 );
828 }
829
830 #[test]
831 fn parses_host_from_ipv6_endpoint() {
832 assert_eq!(host_from_endpoint("[::1]:16888").unwrap(), "::1");
833 }
834
835 #[test]
836 fn default_options_have_mev_protect_disabled() {
837 let options = ClientOptions::default();
838 assert!(!options.mev_protect);
839 }
840
841 #[test]
842 fn default_options_enable_auto_reconnect() {
843 let options = ClientOptions::default();
844 assert!(options.auto_reconnect);
845 }
846
847 #[test]
848 fn default_options_enable_proactive_reconnect() {
849 let options = ClientOptions::default();
850 assert!(options.proactive_reconnect);
851 }
852
853 #[test]
854 fn default_backoff_grows_to_max() {
855 let options = ClientOptions::default();
856 assert!(options.reconnect_initial_backoff < options.reconnect_max_backoff);
857 }
858
859 #[test]
860 fn jittered_returns_zero_for_zero_base() {
861 assert_eq!(jittered(Duration::ZERO), Duration::ZERO);
862 }
863
864 #[test]
865 fn jittered_stays_below_base() {
866 let base = Duration::from_millis(500);
867 for _ in 0..32 {
868 assert!(jittered(base) < base);
869 }
870 }
871
872 #[test]
873 fn build_client_config_without_mev_protect() {
874 install_rustls_provider();
875 let options = ClientOptions::default();
876 build_client_config("test-api-key", &options).unwrap();
878 }
879
880 #[test]
881 fn build_client_config_with_mev_protect() {
882 install_rustls_provider();
883 let options = ClientOptions {
884 mev_protect: true,
885 ..ClientOptions::default()
886 };
887 build_client_config("test-api-key", &options).unwrap();
889 }
890
891 #[test]
892 fn build_client_config_rejects_keepalive_at_or_above_idle() {
893 install_rustls_provider();
894 let options = ClientOptions {
897 keepalive_interval: Duration::from_secs(5),
898 idle_timeout: Duration::from_secs(5),
899 ..ClientOptions::default()
900 };
901 assert!(matches!(
902 build_client_config("test-api-key", &options),
903 Err(ClientError::InvalidTransport { .. })
904 ));
905 let options = ClientOptions {
907 keepalive_interval: Duration::from_secs(10),
908 idle_timeout: Duration::from_secs(5),
909 ..ClientOptions::default()
910 };
911 assert!(matches!(
912 build_client_config("test-api-key", &options),
913 Err(ClientError::InvalidTransport { .. })
914 ));
915 }
916}