naia_shared/connection/
ack_manager.rs

1use std::{collections::HashMap, hash::Hash};
2
3use crate::{
4    messages::message_manager::MessageManager, types::PacketIndex,
5    wrapping_number::sequence_greater_than, HostWorldManager, LocalWorldManager,
6};
7
8use super::{
9    packet_notifiable::PacketNotifiable, packet_type::PacketType, sequence_buffer::SequenceBuffer,
10    standard_header::StandardHeader,
11};
12
13pub const REDUNDANT_PACKET_ACKS_SIZE: u16 = 32;
14const DEFAULT_SEND_PACKETS_SIZE: usize = 256;
15
16/// Keeps track of sent & received packets, and contains ack information that is
17/// copied into the standard header on each outgoing packet
18pub struct AckManager {
19    // Local packet index which we'll bump each time we send a new packet over the network.
20    next_packet_index: PacketIndex,
21    // The last acked packet index of the packets we've sent to the remote host.
22    last_recv_packet_index: PacketIndex,
23    // Using a `Hashmap` to track every packet we send out so we can ensure that we can resend when
24    // dropped.
25    sent_packets: HashMap<PacketIndex, SentPacket>,
26    // However, we can only reasonably ack up to `REDUNDANT_PACKET_ACKS_SIZE + 1` packets on each
27    // message we send so this should be that large.
28    received_packets: SequenceBuffer<ReceivedPacket>,
29    // Whether or not we should send an empty ack on the next outgoing packet
30    should_send_empty_ack: bool,
31}
32
33impl AckManager {
34    pub fn new() -> Self {
35        Self {
36            next_packet_index: 0,
37            last_recv_packet_index: u16::MAX,
38            sent_packets: HashMap::with_capacity(DEFAULT_SEND_PACKETS_SIZE),
39            received_packets: SequenceBuffer::with_capacity(REDUNDANT_PACKET_ACKS_SIZE + 1),
40            should_send_empty_ack: false,
41        }
42    }
43
44    pub fn should_send_empty_ack(&self) -> bool {
45        self.should_send_empty_ack
46    }
47
48    pub fn mark_should_send_empty_ack(&mut self) {
49        self.should_send_empty_ack = true;
50    }
51
52    pub fn clear_should_send_empty_ack(&mut self) {
53        self.should_send_empty_ack = false;
54    }
55
56    /// Get the index of the next outgoing packet
57    pub fn next_sender_packet_index(&self) -> PacketIndex {
58        self.next_packet_index
59    }
60
61    /// Process an incoming packet, handle notifications of delivered / dropped
62    /// packets
63    pub fn process_incoming_header<E: Copy + Eq + Hash + Send + Sync>(
64        &mut self,
65        header: &StandardHeader,
66        message_manager: &mut MessageManager,
67        host_world_manager: &mut HostWorldManager<E>,
68        local_world_manager: &mut LocalWorldManager<E>,
69        packet_notifiables: &mut [&mut dyn PacketNotifiable],
70    ) {
71        let sender_packet_index = header.sender_packet_index;
72        let sender_ack_index = header.sender_ack_index;
73        let mut sender_ack_bitfield = header.sender_ack_bitfield;
74
75        self.received_packets
76            .insert(sender_packet_index, ReceivedPacket {});
77
78        // ensure that `self.sender_ack_index` is always increasing (with
79        // wrapping)
80        if sequence_greater_than(sender_ack_index, self.last_recv_packet_index) {
81            self.last_recv_packet_index = sender_ack_index;
82        }
83
84        // the current `sender_ack_index` was (clearly) received so we should remove it
85        if let Some(sent_packet) = self.sent_packets.get(&sender_ack_index) {
86            if sent_packet.packet_type == PacketType::Data {
87                self.notify_packet_delivered(
88                    sender_ack_index,
89                    message_manager,
90                    host_world_manager,
91                    local_world_manager,
92                    packet_notifiables,
93                );
94            }
95
96            self.sent_packets.remove(&sender_ack_index);
97        }
98
99        // The `sender_ack_bitfield` is going to include whether or not the past 32
100        // packets have been received successfully.
101        // If so, we have no need to resend old packets.
102        for i in 1..=REDUNDANT_PACKET_ACKS_SIZE {
103            let sent_packet_index = sender_ack_index.wrapping_sub(i);
104            if let Some(sent_packet) = self.sent_packets.get(&sent_packet_index) {
105                if sender_ack_bitfield & 1 == 1 {
106                    if sent_packet.packet_type == PacketType::Data {
107                        self.notify_packet_delivered(
108                            sent_packet_index,
109                            message_manager,
110                            host_world_manager,
111                            local_world_manager,
112                            packet_notifiables,
113                        );
114                    }
115
116                    self.sent_packets.remove(&sent_packet_index);
117                } else {
118                    self.sent_packets.remove(&sent_packet_index);
119                }
120            }
121
122            sender_ack_bitfield >>= 1;
123        }
124    }
125
126    /// Records the packet with the given packet index
127    fn track_packet(&mut self, packet_type: PacketType, packet_index: PacketIndex) {
128        self.sent_packets
129            .insert(packet_index, SentPacket { packet_type });
130    }
131
132    /// Bumps the local packet index
133    fn increment_local_packet_index(&mut self) {
134        self.next_packet_index = self.next_packet_index.wrapping_add(1);
135    }
136
137    pub fn next_outgoing_packet_header(&mut self, packet_type: PacketType) -> StandardHeader {
138        let next_packet_index = self.next_sender_packet_index();
139
140        let outgoing = StandardHeader::new(
141            packet_type,
142            next_packet_index,
143            self.last_received_packet_index(),
144            self.ack_bitfield(),
145        );
146
147        self.track_packet(packet_type, next_packet_index);
148        self.increment_local_packet_index();
149
150        outgoing
151    }
152
153    fn notify_packet_delivered<E: Copy + Eq + Hash + Send + Sync>(
154        &self,
155        sent_packet_index: PacketIndex,
156        message_manager: &mut MessageManager,
157        host_world_manager: &mut HostWorldManager<E>,
158        local_world_manager: &mut LocalWorldManager<E>,
159        packet_notifiables: &mut [&mut dyn PacketNotifiable],
160    ) {
161        message_manager.notify_packet_delivered(sent_packet_index);
162        host_world_manager.notify_packet_delivered(sent_packet_index, local_world_manager);
163        for notifiable in packet_notifiables {
164            notifiable.notify_packet_delivered(sent_packet_index);
165        }
166    }
167
168    fn last_received_packet_index(&self) -> PacketIndex {
169        self.received_packets.sequence_num().wrapping_sub(1)
170    }
171
172    fn ack_bitfield(&self) -> u32 {
173        let last_received_remote_packet_index: PacketIndex = self.last_received_packet_index();
174        let mut ack_bitfield: u32 = 0;
175        let mut mask: u32 = 1;
176
177        // iterate the past `REDUNDANT_PACKET_ACKS_SIZE` received packets and set the
178        // corresponding bit for each packet which exists in the buffer.
179        for i in 1..=REDUNDANT_PACKET_ACKS_SIZE {
180            let received_packet_index = last_received_remote_packet_index.wrapping_sub(i);
181            if self.received_packets.exists(received_packet_index) {
182                ack_bitfield |= mask;
183            }
184            mask <<= 1;
185        }
186
187        ack_bitfield
188    }
189}
190
191#[derive(Clone, Debug, Eq, PartialEq)]
192pub struct SentPacket {
193    pub packet_type: PacketType,
194}
195
196#[derive(Clone, Debug, Default)]
197pub struct ReceivedPacket;