1use std::time::Instant;
4
5use async_trait::async_trait;
6use tokio::{net::UdpSocket, time::timeout};
7
8use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
9use crate::{providers::LeaderTarget, routing::RoutingPolicy};
10
11#[derive(Debug, Default, Clone, Copy)]
13pub struct UdpDirectTransport;
14
15#[async_trait]
16impl DirectSubmitTransport for UdpDirectTransport {
17 async fn submit_direct(
18 &self,
19 tx_bytes: &[u8],
20 targets: &[LeaderTarget],
21 policy: RoutingPolicy,
22 config: &DirectSubmitConfig,
23 ) -> Result<LeaderTarget, SubmitTransportError> {
24 let config = config.clone().normalized();
25 if targets.is_empty() {
26 return Err(SubmitTransportError::Config {
27 message: "no targets provided".to_owned(),
28 });
29 }
30
31 let socket =
32 UdpSocket::bind("0.0.0.0:0")
33 .await
34 .map_err(|error| SubmitTransportError::Failure {
35 message: error.to_string(),
36 })?;
37
38 let deadline = Instant::now()
39 .checked_add(config.global_timeout)
40 .ok_or_else(|| SubmitTransportError::Failure {
41 message: "failed to calculate direct-submit deadline".to_owned(),
42 })?;
43 for _round in 0..config.direct_target_rounds {
44 for chunk in targets.chunks(policy.normalized().max_parallel_sends) {
45 for target in chunk {
46 let now = Instant::now();
47 if now >= deadline {
48 return Err(SubmitTransportError::Failure {
49 message: "global direct-submit timeout exceeded".to_owned(),
50 });
51 }
52 let remaining = deadline.saturating_duration_since(now);
53 let per_target = remaining.min(config.per_target_timeout);
54 let send_result =
55 timeout(per_target, socket.send_to(tx_bytes, target.tpu_addr)).await;
56 match send_result {
57 Ok(Ok(_bytes_sent)) => return Ok(target.clone()),
58 Ok(Err(_send_error)) => {}
59 Err(_elapsed) => {}
60 }
61 }
62 }
63 }
64
65 Err(SubmitTransportError::Failure {
66 message: "all direct targets failed".to_owned(),
67 })
68 }
69}