Skip to main content

sof_tx/submit/
direct.rs

1//! UDP direct-submit transport implementation.
2
3use std::time::Instant;
4
5use async_trait::async_trait;
6use tokio::{net::UdpSocket, time::timeout};
7
8use super::{DirectSubmitConfig, DirectSubmitTransport, SubmitTransportError};
9use crate::{providers::LeaderTarget, routing::RoutingPolicy};
10
11/// UDP-based direct transport that sends transaction bytes to TPU targets.
12#[derive(Debug, Default, Clone, Copy)]
13pub struct UdpDirectTransport;
14
15#[async_trait]
16impl DirectSubmitTransport for UdpDirectTransport {
17    async fn submit_direct(
18        &self,
19        tx_bytes: &[u8],
20        targets: &[LeaderTarget],
21        policy: RoutingPolicy,
22        config: &DirectSubmitConfig,
23    ) -> Result<LeaderTarget, SubmitTransportError> {
24        let config = config.clone().normalized();
25        if targets.is_empty() {
26            return Err(SubmitTransportError::Config {
27                message: "no targets provided".to_owned(),
28            });
29        }
30
31        let socket =
32            UdpSocket::bind("0.0.0.0:0")
33                .await
34                .map_err(|error| SubmitTransportError::Failure {
35                    message: error.to_string(),
36                })?;
37
38        let deadline = Instant::now()
39            .checked_add(config.global_timeout)
40            .ok_or_else(|| SubmitTransportError::Failure {
41                message: "failed to calculate direct-submit deadline".to_owned(),
42            })?;
43        for _round in 0..config.direct_target_rounds {
44            for chunk in targets.chunks(policy.normalized().max_parallel_sends) {
45                for target in chunk {
46                    let now = Instant::now();
47                    if now >= deadline {
48                        return Err(SubmitTransportError::Failure {
49                            message: "global direct-submit timeout exceeded".to_owned(),
50                        });
51                    }
52                    let remaining = deadline.saturating_duration_since(now);
53                    let per_target = remaining.min(config.per_target_timeout);
54                    let send_result =
55                        timeout(per_target, socket.send_to(tx_bytes, target.tpu_addr)).await;
56                    match send_result {
57                        Ok(Ok(_bytes_sent)) => return Ok(target.clone()),
58                        Ok(Err(_send_error)) => {}
59                        Err(_elapsed) => {}
60                    }
61                }
62            }
63        }
64
65        Err(SubmitTransportError::Failure {
66            message: "all direct targets failed".to_owned(),
67        })
68    }
69}