use std::{net::SocketAddr, time::Instant};
use crate::protocol::udp::ClusterConfig;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum CloseReason {
Idle,
ResponsesReached,
RequestsReached,
Drain,
Aborted,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FlowPhase {
AwaitingBackend,
Established,
Closing,
}
#[derive(Clone, Debug)]
pub struct UdpFlow {
pub client: SocketAddr,
pub backend_id: Option<String>,
pub backend_addr: Option<SocketAddr>,
pub phase: FlowPhase,
pub config: ClusterConfig,
pub requests_seen: u32,
pub responses_seen: u32,
pub idle_deadline: Instant,
pub timer_gen: u64,
pub first_upstream_pending: bool,
pub pending_payload: Option<Vec<u8>>,
}
impl UdpFlow {
pub fn new(client: SocketAddr, config: ClusterConfig, now: Instant) -> Self {
let idle_deadline = now + config.front_timeout;
UdpFlow {
client,
backend_id: None,
backend_addr: None,
phase: FlowPhase::AwaitingBackend,
first_upstream_pending: config.send_proxy_protocol,
config,
requests_seen: 0,
responses_seen: 0,
idle_deadline,
timer_gen: 0,
pending_payload: None,
}
}
pub fn touch(&mut self, timeout: std::time::Duration, now: Instant) -> u64 {
let old_gen = self.timer_gen;
self.idle_deadline = now + timeout;
self.timer_gen = self.timer_gen.wrapping_add(1);
debug_assert_ne!(
self.timer_gen, old_gen,
"touch must advance the generation token (stale expiry would still match)"
);
self.timer_gen
}
pub fn on_client_datagram(&mut self, now: Instant) -> u64 {
debug_assert_eq!(
self.phase,
FlowPhase::Established,
"on_client_datagram (a real forward) requires an Established flow"
);
let before = self.requests_seen;
self.requests_seen = self.requests_seen.saturating_add(1);
debug_assert!(
self.requests_seen >= before,
"requests_seen must be monotonic non-decreasing"
);
self.touch(self.config.front_timeout, now)
}
pub fn on_backend_datagram(&mut self, now: Instant) -> u64 {
debug_assert_eq!(
self.phase,
FlowPhase::Established,
"on_backend_datagram requires an Established flow"
);
let before = self.responses_seen;
self.responses_seen = self.responses_seen.saturating_add(1);
debug_assert!(
self.responses_seen >= before,
"responses_seen must be monotonic non-decreasing"
);
self.touch(self.config.back_timeout, now)
}
pub fn set_phase(&mut self, next: FlowPhase) {
#[cfg(debug_assertions)]
{
let legal = match (self.phase, next) {
(FlowPhase::AwaitingBackend, FlowPhase::Established) => true,
(FlowPhase::AwaitingBackend, FlowPhase::Closing) => true,
(FlowPhase::Established, FlowPhase::Closing) => true,
_ => false,
};
debug_assert!(
legal,
"illegal flow phase transition {:?} -> {next:?}",
self.phase,
);
}
self.phase = next;
}
pub fn requests_exhausted(&self) -> bool {
self.config.requests != 0 && self.requests_seen >= self.config.requests
}
pub fn responses_exhausted(&self) -> bool {
self.config.responses != 0 && self.responses_seen >= self.config.responses
}
pub fn teardown_reason(&self) -> Option<CloseReason> {
let reason = if self.responses_exhausted() {
Some(CloseReason::ResponsesReached)
} else if self.requests_exhausted() {
Some(CloseReason::RequestsReached)
} else {
None
};
#[cfg(debug_assertions)]
{
let exhausted = self.responses_exhausted() || self.requests_exhausted();
debug_assert_eq!(
reason.is_some(),
exhausted,
"teardown_reason boundary: Some={} but exhausted={} (req {}/{}, resp {}/{})",
reason.is_some(),
exhausted,
self.requests_seen,
self.config.requests,
self.responses_seen,
self.config.responses,
);
}
reason
}
pub fn take_proxy_protocol(&mut self) -> bool {
if !self.config.send_proxy_protocol {
return false;
}
if self.config.proxy_protocol_every_datagram {
return true;
}
if self.first_upstream_pending {
self.first_upstream_pending = false;
true
} else {
false
}
}
}