use crate::async_carrier::{self, AsyncCommandSender, DemandBatcher};
use datum::{Flow, Keep, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, mpsc as std_mpsc};
use tokio::net::{ToSocketAddrs, UdpSocket};
use tokio::runtime::Handle;
use tokio::sync::mpsc as tokio_mpsc;
use tokio::task::JoinHandle;
pub const DEFAULT_MAX_DATAGRAM_SIZE: usize = 65_536;
pub const DEFAULT_RECEIVE_BUFFER: usize = 64;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Datagram {
pub payload: Vec<u8>,
pub remote: SocketAddr,
}
impl Datagram {
#[must_use]
pub fn new(payload: impl Into<Vec<u8>>, remote: SocketAddr) -> Self {
Self {
payload: payload.into(),
remote,
}
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
#[must_use]
pub fn remote(&self) -> SocketAddr {
self.remote
}
#[must_use]
pub fn into_parts(self) -> (Vec<u8>, SocketAddr) {
(self.payload, self.remote)
}
#[must_use]
pub fn into_payload(self) -> Vec<u8> {
self.payload
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UdpBinding {
pub local_addr: SocketAddr,
}
impl UdpBinding {
#[must_use]
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UdpConnection {
pub local_addr: SocketAddr,
pub remote_addr: SocketAddr,
}
impl UdpConnection {
#[must_use]
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
#[must_use]
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}
pub struct TokioUdp;
pub type Udp = TokioUdp;
enum ReceiveResponse<T> {
Item(T),
Error(StreamError),
}
enum QueueOutcome {
Queued,
Dropped,
Closed,
}
struct ReceiveResource<T> {
receiver: std_mpsc::Receiver<ReceiveResponse<T>>,
carrier: UdpCarrier<T>,
demand: DemandBatcher,
}
impl<T> Drop for ReceiveResource<T> {
fn drop(&mut self) {
self.carrier.close_read();
}
}
enum UdpCarrierCommand<T> {
Demand(usize),
SendOne(T),
SendBatch(Vec<T>),
CloseRead,
CloseWrite {
ack: std_mpsc::Sender<StreamResult<()>>,
},
}
#[derive(Clone)]
struct UdpCarrier<T> {
inner: Arc<UdpCarrierInner<T>>,
}
struct UdpCarrierInner<T> {
commands: AsyncCommandSender<UdpCarrierCommand<T>>,
send_errors: Mutex<std_mpsc::Receiver<StreamError>>,
task: Mutex<Option<JoinHandle<()>>>,
}
impl<T> Drop for UdpCarrierInner<T> {
fn drop(&mut self) {
if let Some(task) = self.task.lock().expect("UDP carrier task poisoned").take() {
task.abort();
}
}
}
impl<T> UdpCarrier<T> {
fn close_read(&self) {
let _ = self.inner.commands.try_send(UdpCarrierCommand::CloseRead);
}
}
impl<T> UdpCarrier<T>
where
T: Send + 'static,
{
fn request_demand(&self, demand: usize) -> StreamResult<()> {
self.inner
.commands
.send_or_blocking(UdpCarrierCommand::Demand(demand))
}
fn send_items(&self, items: Vec<T>) -> StreamResult<()> {
self.inner
.commands
.send_or_blocking(UdpCarrierCommand::SendBatch(items))
}
fn send_one(&self, item: T) -> StreamResult<()> {
self.inner
.commands
.send_or_blocking(UdpCarrierCommand::SendOne(item))
}
fn close_write(&self) -> StreamResult<()> {
self.check_send_error()?;
let (ack_sender, ack_receiver) = std_mpsc::channel();
self.inner
.commands
.send_or_blocking(UdpCarrierCommand::CloseWrite { ack: ack_sender })?;
match ack_receiver.recv() {
Ok(result) => result,
Err(_) => Err(abrupt_termination()),
}?;
self.check_send_error()
}
fn check_send_error(&self) -> StreamResult<()> {
match self
.inner
.send_errors
.lock()
.expect("UDP carrier send error receiver poisoned")
.try_recv()
{
Ok(error) => Err(error),
Err(std_mpsc::TryRecvError::Empty) | Err(std_mpsc::TryRecvError::Disconnected) => {
Ok(())
}
}
}
}
struct SendResource<T> {
carrier: UdpCarrier<T>,
pending: Vec<T>,
batch_size: usize,
}
type UdpCarrierParts<T> = (
UdpCarrier<T>,
Option<std_mpsc::Receiver<ReceiveResponse<T>>>,
);
fn io_error(error: std::io::Error) -> StreamError {
StreamError::Failed(error.to_string())
}
fn abrupt_termination() -> StreamError {
StreamError::AbruptTermination
}
impl TokioUdp {
#[must_use]
pub fn bind<A>(
addr: A,
max_datagram_size: usize,
receive_buffer: usize,
) -> Source<Datagram, StreamCompletion<UdpBinding>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
assert!(
max_datagram_size > 0,
"maximum datagram size must be greater than zero"
);
assert!(
receive_buffer > 0,
"receive buffer must be greater than zero"
);
Source::lazy_future_source(move || {
let addr = addr.clone();
async move {
let handle = Handle::current();
let socket = UdpSocket::bind(addr).await.map_err(io_error)?;
let local_addr = socket.local_addr().map_err(io_error)?;
Ok(datagram_source_from_socket(
socket,
local_addr,
handle,
max_datagram_size,
receive_buffer,
))
}
})
}
#[must_use]
pub fn bind_default<A>(addr: A) -> Source<Datagram, StreamCompletion<UdpBinding>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
Self::bind(addr, DEFAULT_MAX_DATAGRAM_SIZE, DEFAULT_RECEIVE_BUFFER)
}
#[must_use]
pub fn send_sink<A>(local_addr: A) -> Sink<Datagram, StreamCompletion<NotUsed>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
Flow::<Datagram, NotUsed>::future_flow(move || {
let local_addr = local_addr.clone();
async move {
let handle = Handle::current();
let socket = UdpSocket::bind(local_addr).await.map_err(io_error)?;
let carrier = start_datagram_carrier(
socket,
handle,
DEFAULT_MAX_DATAGRAM_SIZE,
1,
false,
true,
);
Ok(datagram_send_flow_from_carrier(carrier.0, 1))
}
})
.to_mat(Sink::ignore(), Keep::right)
}
#[must_use]
pub fn bind_flow<A>(
addr: A,
max_datagram_size: usize,
receive_buffer: usize,
) -> Flow<Datagram, Datagram, StreamCompletion<UdpBinding>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
assert!(
max_datagram_size > 0,
"maximum datagram size must be greater than zero"
);
assert!(
receive_buffer > 0,
"receive buffer must be greater than zero"
);
Flow::<Datagram, Datagram>::future_flow(move || {
let addr = addr.clone();
async move {
let handle = Handle::current();
let socket = UdpSocket::bind(addr).await.map_err(io_error)?;
let local_addr = socket.local_addr().map_err(io_error)?;
let (carrier, receiver) = start_datagram_carrier(
socket,
handle,
max_datagram_size,
receive_buffer,
true,
true,
);
let sink = datagram_send_flow_from_carrier(carrier.clone(), 1)
.to_mat(Sink::ignore(), Keep::right);
let source = datagram_source_from_carrier(
carrier,
receiver.expect("UDP bind_flow receiver exists"),
local_addr,
receive_buffer,
);
Ok(Flow::from_sink_and_source(sink, source)
.map_materialized_value(move |_| UdpBinding { local_addr }))
}
})
}
#[must_use]
pub fn bind_flow_default<A>(addr: A) -> Flow<Datagram, Datagram, StreamCompletion<UdpBinding>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
Self::bind_flow(addr, DEFAULT_MAX_DATAGRAM_SIZE, DEFAULT_RECEIVE_BUFFER)
}
#[must_use]
pub fn connect<A, P>(
local_addr: A,
peer: P,
max_datagram_size: usize,
receive_buffer: usize,
) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<UdpConnection>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
P: ToSocketAddrs + Clone + Send + Sync + 'static,
{
assert!(
max_datagram_size > 0,
"maximum datagram size must be greater than zero"
);
assert!(
receive_buffer > 0,
"receive buffer must be greater than zero"
);
Flow::<Vec<u8>, Vec<u8>>::future_flow(move || {
let local_addr = local_addr.clone();
let peer = peer.clone();
async move {
let handle = Handle::current();
let socket = UdpSocket::bind(local_addr).await.map_err(io_error)?;
socket.connect(peer).await.map_err(io_error)?;
let connection = UdpConnection {
local_addr: socket.local_addr().map_err(io_error)?,
remote_addr: socket.peer_addr().map_err(io_error)?,
};
let (carrier, receiver) = start_connected_carrier(
socket,
handle,
max_datagram_size,
receive_buffer,
true,
true,
);
let sink = connected_send_flow_from_carrier(carrier.clone(), 1)
.to_mat(Sink::ignore(), Keep::right);
let source = connected_source_from_carrier(
carrier,
receiver.expect("connected UDP receiver exists"),
receive_buffer,
);
Ok(Flow::from_sink_and_source(sink, source)
.map_materialized_value(move |_| connection))
}
})
}
#[must_use]
pub fn connect_default<A, P>(
local_addr: A,
peer: P,
) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<UdpConnection>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
P: ToSocketAddrs + Clone + Send + Sync + 'static,
{
Self::connect(
local_addr,
peer,
DEFAULT_MAX_DATAGRAM_SIZE,
DEFAULT_RECEIVE_BUFFER,
)
}
}
fn datagram_source_from_socket(
socket: UdpSocket,
local_addr: SocketAddr,
handle: Handle,
max_datagram_size: usize,
receive_buffer: usize,
) -> Source<Datagram, UdpBinding> {
let (carrier, receiver) = start_datagram_carrier(
socket,
handle,
max_datagram_size,
receive_buffer,
true,
false,
);
datagram_source_from_carrier(
carrier,
receiver.expect("UDP bind receiver exists"),
local_addr,
receive_buffer,
)
}
fn datagram_source_from_carrier(
carrier: UdpCarrier<Datagram>,
receiver: std_mpsc::Receiver<ReceiveResponse<Datagram>>,
local_addr: SocketAddr,
receive_buffer: usize,
) -> Source<Datagram, UdpBinding> {
let receiver = Arc::new(Mutex::new(Some(receiver)));
Source::unfold_resource(
move || {
let receiver = receiver
.lock()
.expect("UDP receive resource receiver poisoned")
.take()
.ok_or_else(|| StreamError::Failed("UDP receive source already used".to_owned()))?;
let demand = DemandBatcher::new(receive_buffer);
carrier.request_demand(demand.initial())?;
Ok(ReceiveResource {
receiver,
carrier: carrier.clone(),
demand,
})
},
receive_next_item,
close_receive_resource,
)
.map_materialized_value(move |_| UdpBinding { local_addr })
}
fn connected_source_from_carrier(
carrier: UdpCarrier<Vec<u8>>,
receiver: std_mpsc::Receiver<ReceiveResponse<Vec<u8>>>,
receive_buffer: usize,
) -> Source<Vec<u8>, NotUsed> {
let receiver = Arc::new(Mutex::new(Some(receiver)));
Source::unfold_resource(
move || {
let receiver = receiver
.lock()
.expect("connected UDP receive resource receiver poisoned")
.take()
.ok_or_else(|| {
StreamError::Failed("connected UDP receive source already used".to_owned())
})?;
let demand = DemandBatcher::new(receive_buffer);
carrier.request_demand(demand.initial())?;
Ok(ReceiveResource {
receiver,
carrier: carrier.clone(),
demand,
})
},
receive_next_item,
close_receive_resource,
)
}
fn receive_next_item<T>(resource: &mut ReceiveResource<T>) -> StreamResult<Option<T>>
where
T: Send + 'static,
{
match resource.receiver.recv() {
Ok(ReceiveResponse::Item(item)) => {
if let Some(demand) = resource.demand.record_consumed() {
resource.carrier.request_demand(demand)?;
}
Ok(Some(item))
}
Ok(ReceiveResponse::Error(error)) => Err(error),
Err(_) => Err(abrupt_termination()),
}
}
fn close_receive_resource<T>(resource: ReceiveResource<T>) -> StreamResult<()>
where
T: Send + 'static,
{
resource.carrier.close_read();
Ok(())
}
fn start_datagram_carrier(
socket: UdpSocket,
handle: Handle,
max_datagram_size: usize,
receive_buffer: usize,
read_open: bool,
write_open: bool,
) -> UdpCarrierParts<Datagram> {
let command_capacity = async_carrier::DEFAULT_COMMAND_BUFFER.max(receive_buffer);
let (commands, command_receiver) = async_carrier::command_channel(command_capacity, "UDP");
let (send_error_sender, send_error_receiver) = std_mpsc::channel();
let (receive_sender, receive_receiver) = if read_open {
let (sender, receiver) = std_mpsc::sync_channel(receive_buffer.saturating_add(1));
(Some(sender), Some(receiver))
} else {
(None, None)
};
let task = handle.spawn(run_datagram_carrier_task(
socket,
max_datagram_size,
receive_sender,
send_error_sender,
command_receiver,
read_open,
write_open,
));
(
UdpCarrier {
inner: Arc::new(UdpCarrierInner {
commands,
send_errors: Mutex::new(send_error_receiver),
task: Mutex::new(Some(task)),
}),
},
receive_receiver,
)
}
fn start_connected_carrier(
socket: UdpSocket,
handle: Handle,
max_datagram_size: usize,
receive_buffer: usize,
read_open: bool,
write_open: bool,
) -> UdpCarrierParts<Vec<u8>> {
let command_capacity = async_carrier::DEFAULT_COMMAND_BUFFER.max(receive_buffer);
let (commands, command_receiver) =
async_carrier::command_channel(command_capacity, "connected UDP");
let (send_error_sender, send_error_receiver) = std_mpsc::channel();
let (receive_sender, receive_receiver) = if read_open {
let (sender, receiver) = std_mpsc::sync_channel(receive_buffer.saturating_add(1));
(Some(sender), Some(receiver))
} else {
(None, None)
};
let task = handle.spawn(run_connected_carrier_task(
socket,
max_datagram_size,
receive_sender,
send_error_sender,
command_receiver,
read_open,
write_open,
));
(
UdpCarrier {
inner: Arc::new(UdpCarrierInner {
commands,
send_errors: Mutex::new(send_error_receiver),
task: Mutex::new(Some(task)),
}),
},
receive_receiver,
)
}
async fn run_datagram_carrier_task(
socket: UdpSocket,
max_datagram_size: usize,
receive_sender: Option<std_mpsc::SyncSender<ReceiveResponse<Datagram>>>,
send_error_sender: std_mpsc::Sender<StreamError>,
mut commands: tokio_mpsc::Receiver<UdpCarrierCommand<Datagram>>,
mut read_open: bool,
mut write_open: bool,
) {
let mut buffer = vec![0_u8; max_datagram_size];
let mut requested = 0_usize;
loop {
if !read_open && !write_open {
return;
}
if read_open && requested > 0 {
tokio::select! {
biased;
command = commands.recv() => {
let Some(command) = command else { return; };
if !handle_datagram_command(
&socket,
command,
&receive_sender,
&send_error_sender,
&mut read_open,
&mut write_open,
&mut requested,
).await {
return;
}
}
received = socket.recv_from(&mut buffer) => {
match received {
Ok((read, remote)) => {
let datagram = Datagram::new(buffer[..read].to_vec(), remote);
match try_send_received_item(&receive_sender, datagram) {
QueueOutcome::Queued => {
requested = requested.saturating_sub(1);
}
QueueOutcome::Dropped => {
requested = 0;
if let Err(error) = drain_ready_datagrams(&socket, &mut buffer) {
report_carrier_error(&receive_sender, &send_error_sender, error);
return;
}
}
QueueOutcome::Closed => {
read_open = false;
}
}
}
Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {}
Err(error) => {
report_carrier_error(&receive_sender, &send_error_sender, io_error(error));
return;
}
}
}
}
} else {
let Some(command) = commands.recv().await else {
return;
};
if !handle_datagram_command(
&socket,
command,
&receive_sender,
&send_error_sender,
&mut read_open,
&mut write_open,
&mut requested,
)
.await
{
return;
}
}
}
}
async fn run_connected_carrier_task(
socket: UdpSocket,
max_datagram_size: usize,
receive_sender: Option<std_mpsc::SyncSender<ReceiveResponse<Vec<u8>>>>,
send_error_sender: std_mpsc::Sender<StreamError>,
mut commands: tokio_mpsc::Receiver<UdpCarrierCommand<Vec<u8>>>,
mut read_open: bool,
mut write_open: bool,
) {
let mut buffer = vec![0_u8; max_datagram_size];
let mut requested = 0_usize;
loop {
if !read_open && !write_open {
return;
}
if read_open && requested > 0 {
tokio::select! {
biased;
command = commands.recv() => {
let Some(command) = command else { return; };
if !handle_connected_command(
&socket,
command,
&receive_sender,
&send_error_sender,
&mut read_open,
&mut write_open,
&mut requested,
).await {
return;
}
}
received = socket.recv(&mut buffer) => {
match received {
Ok(read) => {
match try_send_received_item(&receive_sender, buffer[..read].to_vec()) {
QueueOutcome::Queued => {
requested = requested.saturating_sub(1);
}
QueueOutcome::Dropped => {
requested = 0;
if let Err(error) = drain_ready_connected_datagrams(&socket, &mut buffer) {
report_carrier_error(&receive_sender, &send_error_sender, error);
return;
}
}
QueueOutcome::Closed => {
read_open = false;
}
}
}
Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {}
Err(error) => {
report_carrier_error(&receive_sender, &send_error_sender, io_error(error));
return;
}
}
}
}
} else {
let Some(command) = commands.recv().await else {
return;
};
if !handle_connected_command(
&socket,
command,
&receive_sender,
&send_error_sender,
&mut read_open,
&mut write_open,
&mut requested,
)
.await
{
return;
}
}
}
}
async fn handle_datagram_command(
socket: &UdpSocket,
command: UdpCarrierCommand<Datagram>,
receive_sender: &Option<std_mpsc::SyncSender<ReceiveResponse<Datagram>>>,
send_error_sender: &std_mpsc::Sender<StreamError>,
read_open: &mut bool,
write_open: &mut bool,
requested: &mut usize,
) -> bool {
match command {
UdpCarrierCommand::Demand(demand) => {
*requested = requested.saturating_add(demand);
true
}
UdpCarrierCommand::SendOne(datagram) => {
if !*write_open {
let error = StreamError::Failed("UDP write side is closed".to_owned());
report_carrier_error(receive_sender, send_error_sender, error);
return false;
}
send_one_datagram(socket, receive_sender, send_error_sender, datagram).await
}
UdpCarrierCommand::SendBatch(datagrams) => {
if !*write_open {
let error = StreamError::Failed("UDP write side is closed".to_owned());
report_carrier_error(receive_sender, send_error_sender, error);
return false;
}
for datagram in datagrams {
if !send_one_datagram(socket, receive_sender, send_error_sender, datagram).await {
return false;
}
}
true
}
UdpCarrierCommand::CloseRead => {
*read_open = false;
true
}
UdpCarrierCommand::CloseWrite { ack } => {
*write_open = false;
let _ = ack.send(Ok(()));
true
}
}
}
async fn handle_connected_command(
socket: &UdpSocket,
command: UdpCarrierCommand<Vec<u8>>,
receive_sender: &Option<std_mpsc::SyncSender<ReceiveResponse<Vec<u8>>>>,
send_error_sender: &std_mpsc::Sender<StreamError>,
read_open: &mut bool,
write_open: &mut bool,
requested: &mut usize,
) -> bool {
match command {
UdpCarrierCommand::Demand(demand) => {
*requested = requested.saturating_add(demand);
true
}
UdpCarrierCommand::SendOne(payload) => {
if !*write_open {
let error = StreamError::Failed("connected UDP write side is closed".to_owned());
report_carrier_error(receive_sender, send_error_sender, error);
return false;
}
send_one_connected_payload(socket, receive_sender, send_error_sender, payload).await
}
UdpCarrierCommand::SendBatch(payloads) => {
if !*write_open {
let error = StreamError::Failed("connected UDP write side is closed".to_owned());
report_carrier_error(receive_sender, send_error_sender, error);
return false;
}
for payload in payloads {
if !send_one_connected_payload(socket, receive_sender, send_error_sender, payload)
.await
{
return false;
}
}
true
}
UdpCarrierCommand::CloseRead => {
*read_open = false;
true
}
UdpCarrierCommand::CloseWrite { ack } => {
*write_open = false;
let _ = ack.send(Ok(()));
true
}
}
}
async fn send_one_datagram(
socket: &UdpSocket,
receive_sender: &Option<std_mpsc::SyncSender<ReceiveResponse<Datagram>>>,
send_error_sender: &std_mpsc::Sender<StreamError>,
datagram: Datagram,
) -> bool {
let expected = datagram.payload.len();
match socket.send_to(&datagram.payload, datagram.remote).await {
Ok(sent) if sent == expected => true,
Ok(sent) => {
report_carrier_error(
receive_sender,
send_error_sender,
short_send_error(sent, expected),
);
false
}
Err(error) => {
report_carrier_error(receive_sender, send_error_sender, io_error(error));
false
}
}
}
async fn send_one_connected_payload(
socket: &UdpSocket,
receive_sender: &Option<std_mpsc::SyncSender<ReceiveResponse<Vec<u8>>>>,
send_error_sender: &std_mpsc::Sender<StreamError>,
payload: Vec<u8>,
) -> bool {
let expected = payload.len();
match socket.send(&payload).await {
Ok(sent) if sent == expected => true,
Ok(sent) => {
report_carrier_error(
receive_sender,
send_error_sender,
short_send_error(sent, expected),
);
false
}
Err(error) => {
report_carrier_error(receive_sender, send_error_sender, io_error(error));
false
}
}
}
fn try_send_received_item<T>(
sender: &Option<std_mpsc::SyncSender<ReceiveResponse<T>>>,
item: T,
) -> QueueOutcome
where
T: Send + 'static,
{
let Some(sender) = sender else {
return QueueOutcome::Closed;
};
match sender.try_send(ReceiveResponse::Item(item)) {
Ok(()) => QueueOutcome::Queued,
Err(std_mpsc::TrySendError::Full(_)) => QueueOutcome::Dropped,
Err(std_mpsc::TrySendError::Disconnected(_)) => QueueOutcome::Closed,
}
}
fn drain_ready_datagrams(socket: &UdpSocket, buffer: &mut [u8]) -> StreamResult<()> {
loop {
match socket.try_recv_from(buffer) {
Ok((_read, _remote)) => {}
Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => return Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {}
Err(error) => return Err(io_error(error)),
}
}
}
fn drain_ready_connected_datagrams(socket: &UdpSocket, buffer: &mut [u8]) -> StreamResult<()> {
loop {
match socket.try_recv(buffer) {
Ok(_read) => {}
Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => return Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {}
Err(error) => return Err(io_error(error)),
}
}
}
fn report_carrier_error<T>(
receive_sender: &Option<std_mpsc::SyncSender<ReceiveResponse<T>>>,
send_error_sender: &std_mpsc::Sender<StreamError>,
error: StreamError,
) where
T: Send + 'static,
{
let _ = send_error_sender.send(error.clone());
if let Some(receive_sender) = receive_sender {
let _ = receive_sender.try_send(ReceiveResponse::Error(error));
}
}
fn datagram_send_flow_from_carrier(
carrier: UdpCarrier<Datagram>,
batch_size: usize,
) -> Flow<Datagram, NotUsed, NotUsed> {
Flow::<Datagram, Datagram>::identity().map_with_resource(
move || {
Ok(SendResource {
carrier: carrier.clone(),
pending: Vec::with_capacity(batch_size),
batch_size,
})
},
|resource, datagram| {
send_datagram(resource, datagram)?;
Ok(NotUsed)
},
close_send_resource,
)
}
fn connected_send_flow_from_carrier(
carrier: UdpCarrier<Vec<u8>>,
batch_size: usize,
) -> Flow<Vec<u8>, NotUsed, NotUsed> {
Flow::<Vec<u8>, Vec<u8>>::identity().map_with_resource(
move || {
Ok(SendResource {
carrier: carrier.clone(),
pending: Vec::with_capacity(batch_size),
batch_size,
})
},
|resource, payload| {
send_connected_payload(resource, payload)?;
Ok(NotUsed)
},
close_send_resource,
)
}
fn close_send_resource<T>(mut resource: SendResource<T>) -> StreamResult<Option<NotUsed>>
where
T: Send + 'static,
{
flush_send_resource(&mut resource)?;
resource.carrier.close_write()?;
Ok(None)
}
fn send_datagram(resource: &mut SendResource<Datagram>, datagram: Datagram) -> StreamResult<()> {
send_item(resource, datagram)
}
fn send_connected_payload(
resource: &mut SendResource<Vec<u8>>,
payload: Vec<u8>,
) -> StreamResult<()> {
send_item(resource, payload)
}
fn send_item<T>(resource: &mut SendResource<T>, item: T) -> StreamResult<()>
where
T: Send + 'static,
{
if resource.batch_size <= 1 {
return resource.carrier.send_one(item);
}
resource.pending.push(item);
if resource.pending.len() >= resource.batch_size {
flush_send_resource(resource)?;
}
Ok(())
}
fn flush_send_resource<T>(resource: &mut SendResource<T>) -> StreamResult<()>
where
T: Send + 'static,
{
if resource.pending.is_empty() {
return resource.carrier.check_send_error();
}
let pending = std::mem::take(&mut resource.pending);
resource.carrier.send_items(pending)
}
fn short_send_error(sent: usize, expected: usize) -> StreamError {
StreamError::Failed(format!(
"UDP socket sent {sent} bytes from {expected}-byte datagram"
))
}