naia_shared/connection/
ack_manager.rs1use std::{collections::HashMap, hash::Hash};
2
3use crate::{
4 messages::message_manager::MessageManager, types::PacketIndex,
5 wrapping_number::sequence_greater_than, HostWorldManager, LocalWorldManager,
6};
7
8use super::{
9 packet_notifiable::PacketNotifiable, packet_type::PacketType, sequence_buffer::SequenceBuffer,
10 standard_header::StandardHeader,
11};
12
13pub const REDUNDANT_PACKET_ACKS_SIZE: u16 = 32;
14const DEFAULT_SEND_PACKETS_SIZE: usize = 256;
15
16pub struct AckManager {
19 next_packet_index: PacketIndex,
21 last_recv_packet_index: PacketIndex,
23 sent_packets: HashMap<PacketIndex, SentPacket>,
26 received_packets: SequenceBuffer<ReceivedPacket>,
29 should_send_empty_ack: bool,
31}
32
33impl AckManager {
34 pub fn new() -> Self {
35 Self {
36 next_packet_index: 0,
37 last_recv_packet_index: u16::MAX,
38 sent_packets: HashMap::with_capacity(DEFAULT_SEND_PACKETS_SIZE),
39 received_packets: SequenceBuffer::with_capacity(REDUNDANT_PACKET_ACKS_SIZE + 1),
40 should_send_empty_ack: false,
41 }
42 }
43
44 pub fn should_send_empty_ack(&self) -> bool {
45 self.should_send_empty_ack
46 }
47
48 pub fn mark_should_send_empty_ack(&mut self) {
49 self.should_send_empty_ack = true;
50 }
51
52 pub fn clear_should_send_empty_ack(&mut self) {
53 self.should_send_empty_ack = false;
54 }
55
56 pub fn next_sender_packet_index(&self) -> PacketIndex {
58 self.next_packet_index
59 }
60
61 pub fn process_incoming_header<E: Copy + Eq + Hash + Send + Sync>(
64 &mut self,
65 header: &StandardHeader,
66 message_manager: &mut MessageManager,
67 host_world_manager: &mut HostWorldManager<E>,
68 local_world_manager: &mut LocalWorldManager<E>,
69 packet_notifiables: &mut [&mut dyn PacketNotifiable],
70 ) {
71 let sender_packet_index = header.sender_packet_index;
72 let sender_ack_index = header.sender_ack_index;
73 let mut sender_ack_bitfield = header.sender_ack_bitfield;
74
75 self.received_packets
76 .insert(sender_packet_index, ReceivedPacket {});
77
78 if sequence_greater_than(sender_ack_index, self.last_recv_packet_index) {
81 self.last_recv_packet_index = sender_ack_index;
82 }
83
84 if let Some(sent_packet) = self.sent_packets.get(&sender_ack_index) {
86 if sent_packet.packet_type == PacketType::Data {
87 self.notify_packet_delivered(
88 sender_ack_index,
89 message_manager,
90 host_world_manager,
91 local_world_manager,
92 packet_notifiables,
93 );
94 }
95
96 self.sent_packets.remove(&sender_ack_index);
97 }
98
99 for i in 1..=REDUNDANT_PACKET_ACKS_SIZE {
103 let sent_packet_index = sender_ack_index.wrapping_sub(i);
104 if let Some(sent_packet) = self.sent_packets.get(&sent_packet_index) {
105 if sender_ack_bitfield & 1 == 1 {
106 if sent_packet.packet_type == PacketType::Data {
107 self.notify_packet_delivered(
108 sent_packet_index,
109 message_manager,
110 host_world_manager,
111 local_world_manager,
112 packet_notifiables,
113 );
114 }
115
116 self.sent_packets.remove(&sent_packet_index);
117 } else {
118 self.sent_packets.remove(&sent_packet_index);
119 }
120 }
121
122 sender_ack_bitfield >>= 1;
123 }
124 }
125
126 fn track_packet(&mut self, packet_type: PacketType, packet_index: PacketIndex) {
128 self.sent_packets
129 .insert(packet_index, SentPacket { packet_type });
130 }
131
132 fn increment_local_packet_index(&mut self) {
134 self.next_packet_index = self.next_packet_index.wrapping_add(1);
135 }
136
137 pub fn next_outgoing_packet_header(&mut self, packet_type: PacketType) -> StandardHeader {
138 let next_packet_index = self.next_sender_packet_index();
139
140 let outgoing = StandardHeader::new(
141 packet_type,
142 next_packet_index,
143 self.last_received_packet_index(),
144 self.ack_bitfield(),
145 );
146
147 self.track_packet(packet_type, next_packet_index);
148 self.increment_local_packet_index();
149
150 outgoing
151 }
152
153 fn notify_packet_delivered<E: Copy + Eq + Hash + Send + Sync>(
154 &self,
155 sent_packet_index: PacketIndex,
156 message_manager: &mut MessageManager,
157 host_world_manager: &mut HostWorldManager<E>,
158 local_world_manager: &mut LocalWorldManager<E>,
159 packet_notifiables: &mut [&mut dyn PacketNotifiable],
160 ) {
161 message_manager.notify_packet_delivered(sent_packet_index);
162 host_world_manager.notify_packet_delivered(sent_packet_index, local_world_manager);
163 for notifiable in packet_notifiables {
164 notifiable.notify_packet_delivered(sent_packet_index);
165 }
166 }
167
168 fn last_received_packet_index(&self) -> PacketIndex {
169 self.received_packets.sequence_num().wrapping_sub(1)
170 }
171
172 fn ack_bitfield(&self) -> u32 {
173 let last_received_remote_packet_index: PacketIndex = self.last_received_packet_index();
174 let mut ack_bitfield: u32 = 0;
175 let mut mask: u32 = 1;
176
177 for i in 1..=REDUNDANT_PACKET_ACKS_SIZE {
180 let received_packet_index = last_received_remote_packet_index.wrapping_sub(i);
181 if self.received_packets.exists(received_packet_index) {
182 ack_bitfield |= mask;
183 }
184 mask <<= 1;
185 }
186
187 ack_bitfield
188 }
189}
190
191#[derive(Clone, Debug, Eq, PartialEq)]
192pub struct SentPacket {
193 pub packet_type: PacketType,
194}
195
196#[derive(Clone, Debug, Default)]
197pub struct ReceivedPacket;