1use std::{
16 collections::{HashMap, VecDeque},
17 hash::{Hash, Hasher},
18 net::SocketAddr,
19 time::Instant,
20};
21
22use slab::Slab;
23
24use crate::protocol::udp::{
25 ClusterConfig, ConfigEvent, DropReason, FlowId, FlowKey, ManagerInput, MetricEvent, Output,
26 Transmit,
27 flow::{CloseReason, FlowPhase, UdpFlow},
28 proxy_protocol::prepend_dgram_header,
29};
30
31pub trait FlowKeyExtractor {
37 fn flow_key(&self, src: SocketAddr, payload: &[u8], cfg: &ClusterConfig) -> Option<FlowKey>;
41}
42
43#[derive(Clone, Copy, Debug, Default)]
46pub struct SourceTupleExtractor;
47
48impl FlowKeyExtractor for SourceTupleExtractor {
49 fn flow_key(&self, src: SocketAddr, payload: &[u8], cfg: &ClusterConfig) -> Option<FlowKey> {
50 if payload.is_empty() {
52 return None;
53 }
54 Some(FlowKey::from_src(src, cfg.affinity_with_port))
55 }
56}
57
58pub struct UdpManager<E: FlowKeyExtractor = SourceTupleExtractor> {
61 table: HashMap<FlowKey, FlowId>,
63 flows: Slab<UdpFlow>,
65 max_flows: usize,
67 max_rx_datagram_size: usize,
70 cluster: ClusterConfig,
72 hash_seed: u64,
75 extractor: E,
77 draining: bool,
79
80 outputs: VecDeque<Output>,
82 armed_deadline: Option<Instant>,
85
86 #[cfg(debug_assertions)]
93 max_flows_high_water: usize,
94}
95
96impl UdpManager<SourceTupleExtractor> {
97 pub fn new(
99 cluster: ClusterConfig,
100 max_flows: usize,
101 max_rx_datagram_size: usize,
102 hash_seed: u64,
103 ) -> Self {
104 Self::with_extractor(
105 cluster,
106 max_flows,
107 max_rx_datagram_size,
108 hash_seed,
109 SourceTupleExtractor,
110 )
111 }
112}
113
114impl<E: FlowKeyExtractor> UdpManager<E> {
115 pub fn with_extractor(
117 cluster: ClusterConfig,
118 max_flows: usize,
119 max_rx_datagram_size: usize,
120 hash_seed: u64,
121 extractor: E,
122 ) -> Self {
123 UdpManager {
124 table: HashMap::new(),
125 flows: Slab::new(),
126 max_flows,
127 max_rx_datagram_size,
128 cluster,
129 hash_seed,
130 extractor,
131 draining: false,
132 outputs: VecDeque::new(),
133 armed_deadline: None,
134 #[cfg(debug_assertions)]
135 max_flows_high_water: max_flows,
136 }
137 }
138
139 pub fn flow_count(&self) -> usize {
143 self.flows.len()
144 }
145
146 pub fn max_flows(&self) -> usize {
148 self.max_flows
149 }
150
151 pub fn is_draining(&self) -> bool {
153 self.draining
154 }
155
156 pub fn affinity_with_port(&self) -> bool {
160 self.cluster.affinity_with_port
161 }
162
163 pub fn flow(&self, flow: FlowId) -> Option<&UdpFlow> {
165 self.flows.get(flow)
166 }
167
168 pub fn handle_input(&mut self, input: ManagerInput<'_>, now: Instant) {
172 match input {
173 ManagerInput::ClientDatagram { src, payload } => {
174 self.on_client_datagram(src, payload, now)
175 }
176 ManagerInput::BackendDatagram { flow, payload } => {
177 self.on_backend_datagram(flow, payload, now)
178 }
179 ManagerInput::Config(event) => self.on_config(event, now),
180 ManagerInput::BackendResolved {
181 flow,
182 backend,
183 addr,
184 } => self.on_backend_resolved(flow, backend, addr, now),
185 }
186 self.debug_assert_invariants();
189 }
190
191 pub fn abort_flow(&mut self, flow: FlowId, _now: Instant, reason: CloseReason) {
206 self.close_flow(flow, reason);
207 self.debug_assert_invariants();
208 }
209
210 pub fn close_all(&mut self, now: Instant) {
222 let live: Vec<FlowId> = self.flows.iter().map(|(id, _)| id).collect();
226 for flow_id in live {
227 self.abort_flow(flow_id, now, CloseReason::Drain);
228 }
229 #[cfg(debug_assertions)]
231 {
232 debug_assert_eq!(self.flow_count(), 0, "close_all must drain every flow");
233 debug_assert!(
234 self.table.is_empty(),
235 "close_all must clear every table entry"
236 );
237 debug_assert!(
238 self.armed_deadline.is_none(),
239 "close_all must leave no armed timer"
240 );
241 }
242 self.debug_assert_invariants();
243 }
244
245 fn on_client_datagram(&mut self, src: SocketAddr, payload: &[u8], now: Instant) {
246 if payload.len() > self.max_rx_datagram_size {
248 self.drop_datagram(DropReason::Truncated);
249 return;
250 }
251 if self.cluster.cluster.is_empty() {
253 self.drop_datagram(DropReason::NoBackend);
254 return;
255 }
256 let key = match self.extractor.flow_key(src, payload, &self.cluster) {
258 Some(key) => key,
259 None => {
260 self.drop_datagram(DropReason::Invalid);
261 return;
262 }
263 };
264
265 if let Some(&flow_id) = self.table.get(&key) {
267 self.forward_on_existing_flow(flow_id, payload, now);
268 return;
269 }
270
271 let flows_before_admit = self.flows.len();
278 if self.draining {
279 self.drop_datagram(DropReason::Shed);
280 debug_assert_eq!(
281 self.flows.len(),
282 flows_before_admit,
283 "drain shed must allocate no flow"
284 );
285 return;
286 }
287 if self.flows.len() >= self.max_flows {
288 self.outputs
289 .push_back(Output::Metric(MetricEvent::FlowShed));
290 self.drop_datagram(DropReason::Shed);
291 debug_assert_eq!(
292 self.flows.len(),
293 flows_before_admit,
294 "cap shed must allocate no flow"
295 );
296 return;
297 }
298
299 debug_assert!(
305 self.flows.len() < self.max_flows,
306 "admit path entered while at/over the cap"
307 );
308 debug_assert!(
309 !self.table.contains_key(&key),
310 "admit path entered for a key already in the table (would orphan a flow)"
311 );
312
313 let mut flow = UdpFlow::new(src, self.cluster.clone(), now);
320 flow.pending_payload = Some(payload.to_vec());
321 let key_hash = self.affinity_hash(&flow);
322 let flow_id = self.flows.insert(flow);
323 self.table.insert(key, flow_id);
324
325 debug_assert_eq!(
328 self.flows.len(),
329 flows_before_admit + 1,
330 "admission must add exactly one flow"
331 );
332 debug_assert_eq!(
333 self.table.get(&key),
334 Some(&flow_id),
335 "admission must map the key to the new flow"
336 );
337
338 self.outputs
339 .push_back(Output::Metric(MetricEvent::FlowCreated));
340 self.outputs.push_back(Output::SelectBackend {
341 flow: flow_id,
342 cluster: self.cluster.cluster.clone(),
343 key: key_hash,
344 });
345 self.reschedule();
347 }
348
349 fn forward_on_existing_flow(&mut self, flow_id: FlowId, payload: &[u8], now: Instant) {
350 let Some(flow) = self.flows.get_mut(flow_id) else {
351 self.drop_datagram(DropReason::UnknownFlow);
353 return;
354 };
355
356 match flow.phase {
357 FlowPhase::AwaitingBackend => {
358 flow.pending_payload = Some(payload.to_vec());
364 flow.touch(flow.config.front_timeout, now);
365 self.reschedule();
366 }
367 FlowPhase::Established => {
368 flow.on_client_datagram(now);
369 let backend = flow
370 .backend_addr
371 .expect("Established flow always has a backend address");
372 let mut out = payload.to_vec();
373 if flow.take_proxy_protocol() {
374 prepend_dgram_header(&mut out, flow.client, backend);
375 }
376 let teardown = flow.teardown_reason();
377 self.outputs
378 .push_back(Output::Metric(MetricEvent::DatagramIn(payload.len())));
379 self.outputs.push_back(Output::SendToBackend(Transmit {
380 dst: backend,
381 segment_size: None,
382 payload: out,
383 }));
384 if let Some(reason) = teardown {
385 self.close_flow(flow_id, reason);
386 } else {
387 self.reschedule();
388 }
389 }
390 FlowPhase::Closing => {
391 self.drop_datagram(DropReason::Shed);
393 }
394 }
395 }
396
397 fn on_backend_resolved(
398 &mut self,
399 flow_id: FlowId,
400 backend: String,
401 addr: SocketAddr,
402 now: Instant,
403 ) {
404 let Some(flow) = self.flows.get_mut(flow_id) else {
405 self.drop_datagram(DropReason::UnknownFlow);
407 return;
408 };
409 if flow.phase != FlowPhase::AwaitingBackend {
410 return;
412 }
413 flow.backend_id = Some(backend);
414 flow.backend_addr = Some(addr);
415 flow.set_phase(FlowPhase::Established);
416
417 self.outputs.push_back(Output::OpenUpstream {
420 flow: flow_id,
421 backend: addr,
422 });
423
424 let pending = flow.pending_payload.take();
429 if let Some(mut payload) = pending {
430 let payload_len = payload.len();
431 flow.on_client_datagram(now);
432 if flow.take_proxy_protocol() {
433 prepend_dgram_header(&mut payload, flow.client, addr);
434 }
435 let teardown = flow.teardown_reason();
436 self.outputs
437 .push_back(Output::Metric(MetricEvent::DatagramIn(payload_len)));
438 self.outputs.push_back(Output::SendToBackend(Transmit {
439 dst: addr,
440 segment_size: None,
441 payload,
442 }));
443 if let Some(reason) = teardown {
444 self.close_flow(flow_id, reason);
445 return;
446 }
447 self.reschedule();
449 return;
450 }
451 let _gen = self
454 .flows
455 .get_mut(flow_id)
456 .map(|f| f.touch(self.cluster.front_timeout, now));
457 self.reschedule();
458 }
459
460 fn on_backend_datagram(&mut self, flow_id: FlowId, payload: &[u8], now: Instant) {
461 if payload.len() > self.max_rx_datagram_size {
462 self.drop_datagram(DropReason::Truncated);
463 return;
464 }
465 let Some(flow) = self.flows.get_mut(flow_id) else {
466 self.drop_datagram(DropReason::UnknownFlow);
467 return;
468 };
469 if flow.phase != FlowPhase::Established {
470 self.drop_datagram(DropReason::UnknownFlow);
471 return;
472 }
473 flow.on_backend_datagram(now);
474 let client = flow.client;
475 let teardown = flow.teardown_reason();
476 self.outputs
477 .push_back(Output::Metric(MetricEvent::DatagramOut(payload.len())));
478 self.outputs.push_back(Output::SendToClient(Transmit {
479 dst: client,
480 segment_size: None,
481 payload: payload.to_vec(),
482 }));
483 if let Some(reason) = teardown {
484 self.close_flow(flow_id, reason);
485 } else {
486 self.reschedule();
487 }
488 }
489
490 fn on_config(&mut self, event: ConfigEvent, _now: Instant) {
491 match event {
492 ConfigEvent::SetCluster(cfg) => self.cluster = cfg,
493 ConfigEvent::SetMaxFlows(n) => {
494 self.max_flows = n;
495 #[cfg(debug_assertions)]
499 {
500 self.max_flows_high_water = self.max_flows_high_water.max(n);
501 }
502 }
503 ConfigEvent::SetMaxRxDatagramSize(n) => self.max_rx_datagram_size = n,
504 ConfigEvent::Drain => self.draining = true,
505 }
506 }
507
508 pub fn handle_timeout(&mut self, now: Instant) {
515 let due: Vec<FlowId> = self
517 .flows
518 .iter()
519 .filter(|(_, flow)| flow.idle_deadline <= now)
520 .map(|(id, _)| id)
521 .collect();
522 for flow_id in due {
523 if let Some(flow) = self.flows.get(flow_id) {
526 if flow.idle_deadline <= now && flow.phase != FlowPhase::Closing {
527 self.close_flow(flow_id, CloseReason::Idle);
528 }
529 }
530 }
531 self.reschedule();
532
533 #[cfg(debug_assertions)]
539 if let Some(next) = self.armed_deadline {
540 debug_assert!(
541 next > now,
542 "UdpManager::poll_timeout must strictly advance past a firing: \
543 armed {next:?} <= fired_at {now:?} (busy-loop)"
544 );
545 }
546
547 #[cfg(debug_assertions)]
553 for (id, flow) in self.flows.iter() {
554 debug_assert!(
555 flow.idle_deadline > now,
556 "FlowId {id} still due after handle_timeout: deadline {:?} <= now {now:?}",
557 flow.idle_deadline,
558 );
559 }
560 self.debug_assert_invariants();
561 }
562
563 pub fn poll_timeout(&self) -> Option<Instant> {
568 self.armed_deadline
569 }
570
571 pub fn poll_output(&mut self) -> Option<Output> {
573 self.outputs.pop_front()
574 }
575
576 fn reschedule(&mut self) {
581 let next = self
582 .flows
583 .iter()
584 .filter(|(_, f)| f.phase != FlowPhase::Closing)
585 .map(|(_, f)| f.idle_deadline)
586 .min();
587 if next != self.armed_deadline {
588 self.armed_deadline = next;
589 if let Some(deadline) = next {
590 self.outputs.push_back(Output::ArmTimer(deadline));
591 }
592 }
593 }
594
595 fn close_flow(&mut self, flow_id: FlowId, _reason: CloseReason) {
599 let Some(flow) = self.flows.get_mut(flow_id) else {
600 return;
601 };
602 if flow.phase == FlowPhase::Closing {
603 return;
604 }
605 flow.set_phase(FlowPhase::Closing);
606 let key = FlowKey::from_src(flow.client, self.cluster.affinity_with_port);
607 if self.table.get(&key) == Some(&flow_id) {
610 self.table.remove(&key);
611 } else {
612 let own_key = FlowKey::from_src(flow.client, flow.config.affinity_with_port);
615 if self.table.get(&own_key) == Some(&flow_id) {
616 self.table.remove(&own_key);
617 }
618 }
619 self.flows.remove(flow_id);
620 self.outputs
621 .push_back(Output::Metric(MetricEvent::FlowEvicted));
622 self.outputs.push_back(Output::CloseFlow(flow_id));
623 self.reschedule();
624
625 #[cfg(debug_assertions)]
630 {
631 debug_assert!(
632 !self.flows.contains(flow_id),
633 "close_flow left FlowId {flow_id} in the slab"
634 );
635 debug_assert!(
636 self.table.values().all(|&id| id != flow_id),
637 "close_flow left a table entry mapping to the removed FlowId {flow_id}"
638 );
639 }
640 }
641
642 fn drop_datagram(&mut self, reason: DropReason) {
645 let flows_before_drop = self.flows.len();
651 self.outputs
652 .push_back(Output::Metric(MetricEvent::DatagramDropped(reason)));
653 self.outputs.push_back(Output::Drop(reason));
654 debug_assert_eq!(
655 self.flows.len(),
656 flows_before_drop,
657 "a drop must allocate nothing and free nothing (flows.len() unchanged)"
658 );
659 }
660
661 fn affinity_hash(&self, flow: &UdpFlow) -> u64 {
664 let mut hasher = std::collections::hash_map::DefaultHasher::new();
665 self.hash_seed.hash(&mut hasher);
666 if flow.config.affinity_with_port {
667 flow.client.hash(&mut hasher);
668 } else {
669 flow.client.ip().hash(&mut hasher);
670 }
671 hasher.finish()
672 }
673
674 #[cfg(debug_assertions)]
685 fn check_invariants(&self) {
686 use std::collections::HashSet;
687
688 debug_assert_eq!(
690 self.flow_count(),
691 self.flows.len(),
692 "flow_count() must equal flows.len()"
693 );
694
695 let mut seen_ids: HashSet<FlowId> = HashSet::with_capacity(self.table.len());
698 for (key, &id) in self.table.iter() {
699 debug_assert!(
700 self.flows.contains(id),
701 "table key {key:?} maps to FlowId {id} absent from the slab (dangling key)"
702 );
703 debug_assert!(
704 seen_ids.insert(id),
705 "table injectivity violated: FlowId {id} is the target of two distinct FlowKeys"
706 );
707 }
708 debug_assert!(
711 seen_ids.len() <= self.flows.len(),
712 "more distinct table targets than live flows"
713 );
714
715 let mut min_live_deadline: Option<Instant> = None;
717 let mut live_count = 0usize;
718 for (id, flow) in self.flows.iter() {
719 debug_assert_ne!(
723 flow.phase,
724 FlowPhase::Closing,
725 "FlowId {id} persists in the slab while Closing (close_flow must remove it)"
726 );
727 debug_assert!(
728 matches!(
729 flow.phase,
730 FlowPhase::AwaitingBackend | FlowPhase::Established
731 ),
732 "FlowId {id} has an unexpected live phase {:?}",
733 flow.phase
734 );
735
736 match flow.phase {
739 FlowPhase::Established => debug_assert!(
740 flow.backend_addr.is_some(),
741 "Established FlowId {id} has no backend address"
742 ),
743 FlowPhase::AwaitingBackend => debug_assert!(
744 flow.backend_addr.is_none(),
745 "AwaitingBackend FlowId {id} already carries a backend address"
746 ),
747 FlowPhase::Closing => {}
748 }
749
750 if flow.requests_exhausted() || flow.responses_exhausted() {
754 debug_assert!(
755 flow.teardown_reason().is_some(),
756 "FlowId {id} exhausted a cap (req {}/{}, resp {}/{}) but reports no teardown",
757 flow.requests_seen,
758 flow.config.requests,
759 flow.responses_seen,
760 flow.config.responses,
761 );
762 } else {
763 debug_assert!(
767 flow.teardown_reason().is_none(),
768 "FlowId {id} reports a teardown while within both caps"
769 );
770 }
771
772 live_count += 1;
773 min_live_deadline = Some(match min_live_deadline {
774 Some(d) => d.min(flow.idle_deadline),
775 None => flow.idle_deadline,
776 });
777 }
778
779 debug_assert!(
783 self.flows.len() <= self.max_flows_high_water,
784 "live flows {} exceed the high-water cap {}",
785 self.flows.len(),
786 self.max_flows_high_water,
787 );
788
789 debug_assert_eq!(
794 self.armed_deadline.is_some(),
795 live_count > 0,
796 "timer coherence: armed_deadline.is_some() ({}) must match having live flows ({})",
797 self.armed_deadline.is_some(),
798 live_count > 0,
799 );
800 if let Some(armed) = self.armed_deadline {
801 debug_assert_eq!(
802 Some(armed),
803 min_live_deadline,
804 "timer coherence: armed deadline {armed:?} must equal the minimum live idle deadline {min_live_deadline:?}"
805 );
806 }
807 }
808
809 #[inline]
813 fn debug_assert_invariants(&self) {
814 #[cfg(debug_assertions)]
815 self.check_invariants();
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use std::{
822 net::{IpAddr, Ipv4Addr},
823 time::Duration,
824 };
825
826 use super::*;
827
828 fn cluster(name: &str) -> ClusterConfig {
829 ClusterConfig {
830 cluster: name.to_owned(),
831 front_timeout: Duration::from_secs(30),
832 back_timeout: Duration::from_secs(30),
833 ..Default::default()
834 }
835 }
836
837 fn client(n: u8, port: u16) -> SocketAddr {
838 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, n)), port)
839 }
840
841 fn backend() -> SocketAddr {
842 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5300)
843 }
844
845 fn drain(mgr: &mut UdpManager) -> Vec<Output> {
847 let mut out = Vec::new();
848 while let Some(o) = mgr.poll_output() {
849 out.push(o);
850 }
851 out
852 }
853
854 #[test]
855 fn unknown_datagram_allocates_nothing_when_no_cluster() {
856 let mut mgr = UdpManager::new(ClusterConfig::default(), 16, 65535, 0xABCD);
857 let now = Instant::now();
858 mgr.handle_input(
859 ManagerInput::ClientDatagram {
860 src: client(1, 1000),
861 payload: b"hi",
862 },
863 now,
864 );
865 assert_eq!(mgr.flow_count(), 0);
866 let outs = drain(&mut mgr);
867 assert!(
868 outs.iter()
869 .any(|o| matches!(o, Output::Drop(DropReason::NoBackend)))
870 );
871 assert!(
872 !outs
873 .iter()
874 .any(|o| matches!(o, Output::SelectBackend { .. }))
875 );
876 }
877
878 #[test]
879 fn empty_datagram_is_invalid() {
880 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 1);
881 mgr.handle_input(
882 ManagerInput::ClientDatagram {
883 src: client(1, 1000),
884 payload: b"",
885 },
886 Instant::now(),
887 );
888 assert_eq!(mgr.flow_count(), 0);
889 let outs = drain(&mut mgr);
890 assert!(
891 outs.iter()
892 .any(|o| matches!(o, Output::Drop(DropReason::Invalid)))
893 );
894 }
895
896 #[test]
897 fn truncated_datagram_dropped_before_admission() {
898 let mut mgr = UdpManager::new(cluster("dns"), 16, 4, 1);
899 mgr.handle_input(
900 ManagerInput::ClientDatagram {
901 src: client(1, 1000),
902 payload: b"toolong",
903 },
904 Instant::now(),
905 );
906 assert_eq!(mgr.flow_count(), 0);
907 let outs = drain(&mut mgr);
908 assert!(
909 outs.iter()
910 .any(|o| matches!(o, Output::Drop(DropReason::Truncated)))
911 );
912 }
913
914 #[test]
915 fn new_flow_requests_backend_then_forwards_buffered_datagram() {
916 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
917 let now = Instant::now();
918 let src = client(1, 1000);
919 mgr.handle_input(
920 ManagerInput::ClientDatagram {
921 src,
922 payload: b"query",
923 },
924 now,
925 );
926 assert_eq!(mgr.flow_count(), 1);
927 let outs = drain(&mut mgr);
928 let select = outs
929 .iter()
930 .find_map(|o| match o {
931 Output::SelectBackend { flow, cluster, .. } => Some((*flow, cluster.clone())),
932 _ => None,
933 })
934 .expect("SelectBackend emitted");
935 assert_eq!(select.1, "dns");
936 assert!(!outs.iter().any(|o| matches!(o, Output::SendToBackend(_))));
938
939 mgr.handle_input(
940 ManagerInput::BackendResolved {
941 flow: select.0,
942 backend: "b1".to_owned(),
943 addr: backend(),
944 },
945 now,
946 );
947 let outs = drain(&mut mgr);
948 assert!(
949 outs.iter()
950 .any(|o| matches!(o, Output::OpenUpstream { .. }))
951 );
952 let sent = outs
953 .iter()
954 .find_map(|o| match o {
955 Output::SendToBackend(t) => Some(t.clone()),
956 _ => None,
957 })
958 .expect("buffered datagram flushed on resolve");
959 assert_eq!(sent.dst, backend());
960 assert_eq!(sent.payload, b"query");
961 }
962
963 #[test]
964 fn tracked_flow_reuses_backend() {
965 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
966 let now = Instant::now();
967 let src = client(1, 1000);
968 mgr.handle_input(
969 ManagerInput::ClientDatagram {
970 src,
971 payload: b"q1",
972 },
973 now,
974 );
975 let select_flow = drain(&mut mgr)
976 .into_iter()
977 .find_map(|o| match o {
978 Output::SelectBackend { flow, .. } => Some(flow),
979 _ => None,
980 })
981 .unwrap();
982 mgr.handle_input(
983 ManagerInput::BackendResolved {
984 flow: select_flow,
985 backend: "b1".to_owned(),
986 addr: backend(),
987 },
988 now,
989 );
990 drain(&mut mgr);
991
992 mgr.handle_input(
994 ManagerInput::ClientDatagram {
995 src,
996 payload: b"q2",
997 },
998 now,
999 );
1000 assert_eq!(mgr.flow_count(), 1);
1001 let outs = drain(&mut mgr);
1002 assert!(
1003 !outs
1004 .iter()
1005 .any(|o| matches!(o, Output::SelectBackend { .. }))
1006 );
1007 assert!(
1008 outs.iter()
1009 .any(|o| matches!(o, Output::SendToBackend(t) if t.payload == b"q2"))
1010 );
1011 }
1012
1013 #[test]
1014 fn responses_one_closes_flow_after_single_reply() {
1015 let mut cfg = cluster("dns");
1016 cfg.responses = 1;
1017 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1018 let now = Instant::now();
1019 let src = client(1, 1000);
1020 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1021 let flow = drain(&mut mgr)
1022 .into_iter()
1023 .find_map(|o| match o {
1024 Output::SelectBackend { flow, .. } => Some(flow),
1025 _ => None,
1026 })
1027 .unwrap();
1028 mgr.handle_input(
1029 ManagerInput::BackendResolved {
1030 flow,
1031 backend: "b1".to_owned(),
1032 addr: backend(),
1033 },
1034 now,
1035 );
1036 drain(&mut mgr);
1037
1038 mgr.handle_input(
1040 ManagerInput::BackendDatagram {
1041 flow,
1042 payload: b"answer",
1043 },
1044 now,
1045 );
1046 let outs = drain(&mut mgr);
1047 assert!(
1048 outs.iter()
1049 .any(|o| matches!(o, Output::SendToClient(t) if t.payload == b"answer"))
1050 );
1051 assert!(
1052 outs.iter()
1053 .any(|o| matches!(o, Output::CloseFlow(f) if *f == flow))
1054 );
1055 assert_eq!(mgr.flow_count(), 0);
1056 }
1057
1058 #[test]
1059 fn requests_cap_closes_flow() {
1060 let mut cfg = cluster("syslog");
1061 cfg.requests = 2;
1062 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1063 let now = Instant::now();
1064 let src = client(1, 1000);
1065 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"1" }, now);
1066 let flow = drain(&mut mgr)
1067 .into_iter()
1068 .find_map(|o| match o {
1069 Output::SelectBackend { flow, .. } => Some(flow),
1070 _ => None,
1071 })
1072 .unwrap();
1073 mgr.handle_input(
1074 ManagerInput::BackendResolved {
1075 flow,
1076 backend: "b1".to_owned(),
1077 addr: backend(),
1078 },
1079 now,
1080 );
1081 drain(&mut mgr);
1082 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"2" }, now);
1084 let outs = drain(&mut mgr);
1085 assert!(outs.iter().any(|o| matches!(o, Output::CloseFlow(_))));
1086 assert_eq!(mgr.flow_count(), 0);
1087 }
1088
1089 #[test]
1090 fn flow_table_full_sheds_new_flow() {
1091 let mut mgr = UdpManager::new(cluster("dns"), 1, 65535, 7);
1092 let now = Instant::now();
1093 mgr.handle_input(
1094 ManagerInput::ClientDatagram {
1095 src: client(1, 1000),
1096 payload: b"a",
1097 },
1098 now,
1099 );
1100 assert_eq!(mgr.flow_count(), 1);
1101 drain(&mut mgr);
1102 mgr.handle_input(
1104 ManagerInput::ClientDatagram {
1105 src: client(2, 1000),
1106 payload: b"b",
1107 },
1108 now,
1109 );
1110 assert_eq!(mgr.flow_count(), 1);
1111 let outs = drain(&mut mgr);
1112 assert!(
1113 outs.iter()
1114 .any(|o| matches!(o, Output::Metric(MetricEvent::FlowShed)))
1115 );
1116 assert!(
1117 outs.iter()
1118 .any(|o| matches!(o, Output::Drop(DropReason::Shed)))
1119 );
1120 }
1121
1122 #[test]
1123 fn idle_timeout_closes_flow() {
1124 let mut cfg = cluster("dns");
1125 cfg.front_timeout = Duration::from_secs(10);
1126 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1127 let now = Instant::now();
1128 mgr.handle_input(
1129 ManagerInput::ClientDatagram {
1130 src: client(1, 1000),
1131 payload: b"q",
1132 },
1133 now,
1134 );
1135 drain(&mut mgr);
1136 let deadline = mgr.poll_timeout().expect("timer armed");
1137 assert!(deadline >= now + Duration::from_secs(10));
1138 mgr.handle_timeout(now + Duration::from_secs(11));
1140 let outs = drain(&mut mgr);
1141 assert!(outs.iter().any(|o| matches!(o, Output::CloseFlow(_))));
1142 assert_eq!(mgr.flow_count(), 0);
1143 assert!(mgr.poll_timeout().is_none());
1144 }
1145
1146 #[test]
1147 fn idle_race_resolved_by_generation_token() {
1148 let mut cfg = cluster("dns");
1150 cfg.front_timeout = Duration::from_secs(10);
1151 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1152 let now = Instant::now();
1153 let src = client(1, 1000);
1154 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1155 let flow = drain(&mut mgr)
1156 .into_iter()
1157 .find_map(|o| match o {
1158 Output::SelectBackend { flow, .. } => Some(flow),
1159 _ => None,
1160 })
1161 .unwrap();
1162 mgr.handle_input(
1163 ManagerInput::BackendResolved {
1164 flow,
1165 backend: "b1".to_owned(),
1166 addr: backend(),
1167 },
1168 now,
1169 );
1170 drain(&mut mgr);
1171 let gen0 = mgr.flow(flow).unwrap().timer_gen;
1172 let t5 = now + Duration::from_secs(5);
1174 mgr.handle_input(
1175 ManagerInput::ClientDatagram {
1176 src,
1177 payload: b"q2",
1178 },
1179 t5,
1180 );
1181 drain(&mut mgr);
1182 let gen1 = mgr.flow(flow).unwrap().timer_gen;
1183 assert_ne!(gen0, gen1, "generation token must bump on touch");
1184 mgr.handle_timeout(now + Duration::from_secs(10));
1187 assert_eq!(mgr.flow_count(), 1, "refreshed flow survives stale expiry");
1188 mgr.handle_timeout(now + Duration::from_secs(16));
1190 assert_eq!(mgr.flow_count(), 0);
1191 }
1192
1193 #[test]
1194 fn drain_sheds_new_flows_but_keeps_existing() {
1195 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1196 let now = Instant::now();
1197 let src = client(1, 1000);
1198 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1199 drain(&mut mgr);
1200 mgr.handle_input(ManagerInput::Config(ConfigEvent::Drain), now);
1201 mgr.handle_input(
1203 ManagerInput::ClientDatagram {
1204 src: client(2, 1000),
1205 payload: b"q",
1206 },
1207 now,
1208 );
1209 assert_eq!(mgr.flow_count(), 1, "existing flow kept, new flow shed");
1210 let outs = drain(&mut mgr);
1211 assert!(
1212 outs.iter()
1213 .any(|o| matches!(o, Output::Drop(DropReason::Shed)))
1214 );
1215 }
1216
1217 #[test]
1218 fn reconfig_midflow_preserves_existing_flow_contract() {
1219 let mut cfg = cluster("dns");
1220 cfg.responses = 0;
1221 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1222 let now = Instant::now();
1223 let src = client(1, 1000);
1224 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1225 let flow = drain(&mut mgr)
1226 .into_iter()
1227 .find_map(|o| match o {
1228 Output::SelectBackend { flow, .. } => Some(flow),
1229 _ => None,
1230 })
1231 .unwrap();
1232 mgr.handle_input(
1233 ManagerInput::BackendResolved {
1234 flow,
1235 backend: "b1".to_owned(),
1236 addr: backend(),
1237 },
1238 now,
1239 );
1240 drain(&mut mgr);
1241 let mut newcfg = cluster("dns");
1243 newcfg.responses = 1;
1244 mgr.handle_input(ManagerInput::Config(ConfigEvent::SetCluster(newcfg)), now);
1245 mgr.handle_input(
1247 ManagerInput::BackendDatagram {
1248 flow,
1249 payload: b"reply",
1250 },
1251 now,
1252 );
1253 assert_eq!(mgr.flow_count(), 1);
1254 }
1255
1256 #[test]
1257 fn reaper_drains_active_flows_to_zero() {
1258 let mut cfg = cluster("dns");
1259 cfg.front_timeout = Duration::from_secs(5);
1260 let mut mgr = UdpManager::new(cfg, 64, 65535, 7);
1261 let now = Instant::now();
1262 for n in 1..=10u8 {
1263 mgr.handle_input(
1264 ManagerInput::ClientDatagram {
1265 src: client(n, 1000),
1266 payload: b"q",
1267 },
1268 now,
1269 );
1270 }
1271 assert_eq!(mgr.flow_count(), 10);
1272 drain(&mut mgr);
1273 mgr.handle_timeout(now + Duration::from_secs(6));
1275 assert_eq!(mgr.flow_count(), 0, "reaper drains every flow, no leak");
1276 let outs = drain(&mut mgr);
1277 let closes = outs
1278 .iter()
1279 .filter(|o| matches!(o, Output::CloseFlow(_)))
1280 .count();
1281 assert_eq!(closes, 10);
1282 assert!(mgr.poll_timeout().is_none(), "no armed timer after drain");
1283 }
1284
1285 #[test]
1286 fn proxy_protocol_first_datagram_only() {
1287 let mut cfg = cluster("dns");
1288 cfg.send_proxy_protocol = true;
1289 cfg.proxy_protocol_every_datagram = false;
1290 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1291 let now = Instant::now();
1292 let src = client(1, 1000);
1293 mgr.handle_input(
1294 ManagerInput::ClientDatagram {
1295 src,
1296 payload: b"q1",
1297 },
1298 now,
1299 );
1300 let flow = drain(&mut mgr)
1301 .into_iter()
1302 .find_map(|o| match o {
1303 Output::SelectBackend { flow, .. } => Some(flow),
1304 _ => None,
1305 })
1306 .unwrap();
1307 mgr.handle_input(
1308 ManagerInput::BackendResolved {
1309 flow,
1310 backend: "b1".to_owned(),
1311 addr: backend(),
1312 },
1313 now,
1314 );
1315 let first = drain(&mut mgr)
1317 .into_iter()
1318 .find_map(|o| match o {
1319 Output::SendToBackend(t) => Some(t.payload),
1320 _ => None,
1321 })
1322 .unwrap();
1323 assert!(first.len() > 2, "PPv2 header prepended to first datagram");
1324 assert_eq!(&first[..4], &[0x0D, 0x0A, 0x0D, 0x0A]);
1325 assert_eq!(first[12], 0x21);
1326 assert_eq!(first[13], 0x12);
1327 assert_eq!(&first[first.len() - 2..], b"q1");
1328
1329 mgr.handle_input(
1331 ManagerInput::ClientDatagram {
1332 src,
1333 payload: b"q2",
1334 },
1335 now,
1336 );
1337 let second = drain(&mut mgr)
1338 .into_iter()
1339 .find_map(|o| match o {
1340 Output::SendToBackend(t) => Some(t.payload),
1341 _ => None,
1342 })
1343 .unwrap();
1344 assert_eq!(second, b"q2", "no PPv2 prefix on subsequent datagrams");
1345 }
1346
1347 fn establish(mgr: &mut UdpManager, src: SocketAddr, now: Instant) -> FlowId {
1351 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1352 let flow = drain(mgr)
1353 .into_iter()
1354 .find_map(|o| match o {
1355 Output::SelectBackend { flow, .. } => Some(flow),
1356 _ => None,
1357 })
1358 .unwrap();
1359 mgr.handle_input(
1360 ManagerInput::BackendResolved {
1361 flow,
1362 backend: "b1".to_owned(),
1363 addr: backend(),
1364 },
1365 now,
1366 );
1367 drain(mgr);
1368 flow
1369 }
1370
1371 #[test]
1372 fn close_all_evicts_every_flow_exactly_once() {
1373 let mut cfg = cluster("dns");
1374 cfg.front_timeout = Duration::from_secs(30);
1375 let mut mgr = UdpManager::new(cfg, 64, 65535, 7);
1376 let now = Instant::now();
1377 const N: usize = 6;
1379 let mut flow_ids = Vec::new();
1380 for n in 1..=4u8 {
1381 flow_ids.push(establish(&mut mgr, client(n, 1000), now));
1382 }
1383 for n in 5..=6u8 {
1385 mgr.handle_input(
1386 ManagerInput::ClientDatagram {
1387 src: client(n, 1000),
1388 payload: b"q",
1389 },
1390 now,
1391 );
1392 let flow = drain(&mut mgr)
1393 .into_iter()
1394 .find_map(|o| match o {
1395 Output::SelectBackend { flow, .. } => Some(flow),
1396 _ => None,
1397 })
1398 .unwrap();
1399 flow_ids.push(flow);
1400 }
1401 assert_eq!(mgr.flow_count(), N);
1402
1403 mgr.close_all(now);
1404 let outs = drain(&mut mgr);
1405 let evicted = outs
1406 .iter()
1407 .filter(|o| matches!(o, Output::Metric(MetricEvent::FlowEvicted)))
1408 .count();
1409 let closed = outs
1410 .iter()
1411 .filter(|o| matches!(o, Output::CloseFlow(_)))
1412 .count();
1413 assert_eq!(evicted, N, "one FlowEvicted per live flow");
1414 assert_eq!(closed, N, "one CloseFlow per live flow");
1415 assert_eq!(mgr.flow_count(), 0, "flow table + slab drained to zero");
1416 assert!(
1417 mgr.poll_timeout().is_none(),
1418 "no armed timer after close_all"
1419 );
1420 }
1421
1422 #[test]
1423 fn close_all_is_idempotent_and_empty_safe() {
1424 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1425 let now = Instant::now();
1426 mgr.close_all(now);
1428 assert!(drain(&mut mgr).is_empty());
1429 assert_eq!(mgr.flow_count(), 0);
1430
1431 establish(&mut mgr, client(1, 1000), now);
1434 assert_eq!(mgr.flow_count(), 1);
1435 mgr.close_all(now);
1436 let first = drain(&mut mgr);
1437 assert_eq!(
1438 first
1439 .iter()
1440 .filter(|o| matches!(o, Output::CloseFlow(_)))
1441 .count(),
1442 1
1443 );
1444 mgr.close_all(now);
1445 assert!(drain(&mut mgr).is_empty(), "second close_all emits nothing");
1446 assert_eq!(mgr.flow_count(), 0);
1447 }
1448
1449 #[test]
1450 fn abort_flow_closes_established_flow() {
1451 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1452 let now = Instant::now();
1453 let flow = establish(&mut mgr, client(1, 1000), now);
1454 assert_eq!(mgr.flow_count(), 1);
1455
1456 mgr.abort_flow(flow, now, CloseReason::Aborted);
1457 let outs = drain(&mut mgr);
1458 assert!(
1459 outs.iter()
1460 .any(|o| matches!(o, Output::Metric(MetricEvent::FlowEvicted)))
1461 );
1462 assert!(
1463 outs.iter()
1464 .any(|o| matches!(o, Output::CloseFlow(f) if *f == flow))
1465 );
1466 assert_eq!(mgr.flow_count(), 0);
1467
1468 mgr.abort_flow(flow, now, CloseReason::Aborted);
1470 assert!(drain(&mut mgr).is_empty());
1471 assert_eq!(mgr.flow_count(), 0);
1472 }
1473
1474 #[test]
1475 fn abort_flow_closes_awaiting_backend_flow() {
1476 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1479 let now = Instant::now();
1480 mgr.handle_input(
1481 ManagerInput::ClientDatagram {
1482 src: client(1, 1000),
1483 payload: b"q",
1484 },
1485 now,
1486 );
1487 let flow = drain(&mut mgr)
1488 .into_iter()
1489 .find_map(|o| match o {
1490 Output::SelectBackend { flow, .. } => Some(flow),
1491 _ => None,
1492 })
1493 .unwrap();
1494 assert_eq!(mgr.flow_count(), 1);
1495 assert_eq!(mgr.flow(flow).unwrap().phase, FlowPhase::AwaitingBackend);
1496
1497 mgr.abort_flow(flow, now, CloseReason::Aborted);
1498 let outs = drain(&mut mgr);
1499 assert!(
1500 outs.iter()
1501 .any(|o| matches!(o, Output::Metric(MetricEvent::FlowEvicted)))
1502 );
1503 assert!(
1504 outs.iter()
1505 .any(|o| matches!(o, Output::CloseFlow(f) if *f == flow))
1506 );
1507 assert_eq!(mgr.flow_count(), 0, "AwaitingBackend slot freed by abort");
1508 assert!(mgr.poll_timeout().is_none());
1509 mgr.handle_input(
1512 ManagerInput::ClientDatagram {
1513 src: client(1, 1000),
1514 payload: b"q2",
1515 },
1516 now,
1517 );
1518 assert_eq!(mgr.flow_count(), 1);
1519 }
1520
1521 #[test]
1522 fn abort_flow_unknown_id_is_noop() {
1523 let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1524 let now = Instant::now();
1525 mgr.abort_flow(999, now, CloseReason::Aborted);
1526 assert!(drain(&mut mgr).is_empty());
1527 assert_eq!(mgr.flow_count(), 0);
1528 }
1529
1530 #[test]
1531 fn requests_counts_forwards_not_buffered_during_await() {
1532 let mut cfg = cluster("syslog");
1538 cfg.requests = 2;
1539 let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1540 let now = Instant::now();
1541 let src = client(1, 1000);
1542
1543 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"1" }, now);
1545 let flow = drain(&mut mgr)
1546 .into_iter()
1547 .find_map(|o| match o {
1548 Output::SelectBackend { flow, .. } => Some(flow),
1549 _ => None,
1550 })
1551 .unwrap();
1552 for p in [b"2".as_slice(), b"3", b"4"] {
1554 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: p }, now);
1555 }
1556 drain(&mut mgr);
1557 assert_eq!(mgr.flow_count(), 1, "await burst must not close the flow");
1559 assert_eq!(
1560 mgr.flow(flow).unwrap().requests_seen,
1561 0,
1562 "buffered-only datagrams must not count toward requests"
1563 );
1564
1565 mgr.handle_input(
1567 ManagerInput::BackendResolved {
1568 flow,
1569 backend: "b1".to_owned(),
1570 addr: backend(),
1571 },
1572 now,
1573 );
1574 let outs = drain(&mut mgr);
1575 assert!(
1576 outs.iter()
1577 .any(|o| matches!(o, Output::SendToBackend(t) if t.payload == b"4")),
1578 "newest buffered datagram is the one forwarded"
1579 );
1580 assert!(
1581 !outs.iter().any(|o| matches!(o, Output::CloseFlow(_))),
1582 "one forward must not reach requests=2"
1583 );
1584 assert_eq!(mgr.flow_count(), 1);
1585 assert_eq!(mgr.flow(flow).unwrap().requests_seen, 1);
1586
1587 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"5" }, now);
1589 let outs = drain(&mut mgr);
1590 assert!(
1591 outs.iter()
1592 .any(|o| matches!(o, Output::SendToBackend(t) if t.payload == b"5"))
1593 );
1594 assert!(
1595 outs.iter().any(|o| matches!(o, Output::CloseFlow(_))),
1596 "second real forward reaches requests=2"
1597 );
1598 assert_eq!(mgr.flow_count(), 0);
1599 }
1600
1601 use quickcheck::{Arbitrary, Gen, quickcheck};
1604
1605 #[derive(Clone, Debug)]
1608 enum Step {
1609 Client(u8),
1611 Tick(u8),
1613 }
1614
1615 impl Arbitrary for Step {
1616 fn arbitrary(g: &mut Gen) -> Self {
1617 if bool::arbitrary(g) {
1618 Step::Client(u8::arbitrary(g))
1619 } else {
1620 Step::Tick(u8::arbitrary(g) % 40)
1621 }
1622 }
1623 }
1624
1625 #[test]
1632 fn prop_flow_invariants() {
1633 fn prop(steps: Vec<Step>) -> bool {
1634 const MAX_FLOWS: usize = 4;
1635 let mut cfg = cluster("dns");
1636 cfg.front_timeout = Duration::from_secs(10);
1637 let mut mgr = UdpManager::new(cfg, MAX_FLOWS, 65535, 0x5EED);
1638 let base = Instant::now();
1639 let mut now = base;
1640 let mut created = 0usize;
1641 let mut closed = 0usize;
1642
1643 for step in steps {
1644 match step {
1645 Step::Client(id) => {
1646 let src = client(id % 8, 9000 + (id % 8) as u16);
1647 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1648 }
1649 Step::Tick(secs) => {
1650 now += Duration::from_secs(secs as u64);
1651 mgr.handle_timeout(now);
1652 }
1653 }
1654 while let Some(out) = mgr.poll_output() {
1656 match out {
1657 Output::Metric(MetricEvent::FlowCreated) => created += 1,
1658 Output::CloseFlow(_) => closed += 1,
1659 _ => {}
1660 }
1661 }
1662 if mgr.flow_count() > MAX_FLOWS {
1664 return false;
1665 }
1666 }
1667
1668 now += Duration::from_secs(60);
1670 mgr.handle_timeout(now);
1671 while let Some(out) = mgr.poll_output() {
1672 if let Output::CloseFlow(_) = out {
1673 closed += 1;
1674 }
1675 }
1676 mgr.flow_count() == 0 && mgr.poll_timeout().is_none() && created == closed
1677 }
1678 quickcheck(prop as fn(Vec<Step>) -> bool);
1679 }
1680
1681 #[test]
1686 fn prop_generation_token_defeats_stale_close() {
1687 fn prop(refresh_offset: u8) -> bool {
1688 let timeout = 20u64;
1689 let offset = 1 + (refresh_offset as u64 % (timeout - 1));
1691 let mut cfg = cluster("dns");
1692 cfg.front_timeout = Duration::from_secs(timeout);
1693 let mut mgr = UdpManager::new(cfg, 8, 65535, 1);
1694 let now = Instant::now();
1695 let src = client(1, 1000);
1696 mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1697 while mgr.poll_output().is_some() {}
1698
1699 let refreshed_at = now + Duration::from_secs(offset);
1702 mgr.handle_input(
1703 ManagerInput::ClientDatagram {
1704 src,
1705 payload: b"q2",
1706 },
1707 refreshed_at,
1708 );
1709 while mgr.poll_output().is_some() {}
1710
1711 mgr.handle_timeout(now + Duration::from_secs(timeout));
1713 while mgr.poll_output().is_some() {}
1714 if mgr.flow_count() != 1 {
1715 return false;
1716 }
1717 mgr.handle_timeout(refreshed_at + Duration::from_secs(timeout + 1));
1719 while mgr.poll_output().is_some() {}
1720 mgr.flow_count() == 0
1721 }
1722 quickcheck(prop as fn(u8) -> bool);
1723 }
1724}