luanti_protocol/peer/
channel.rs

1use std::{collections::VecDeque, time::Instant};
2
3use anyhow::Result;
4use tokio::sync::mpsc::UnboundedSender;
5
6use crate::{
7    commands::Command,
8    types::ProtocolContext,
9    wire::{
10        deser::{Deserialize, Deserializer},
11        packet::{ControlBody, InnerBody, PacketBody, ReliableBody},
12    },
13};
14
15use super::{ReliableReceiver, ReliableSender, SplitReceiver, SplitSender};
16
17pub(crate) struct Channel {
18    unreliable_out: VecDeque<InnerBody>,
19
20    reliable_in: ReliableReceiver,
21    reliable_out: ReliableSender,
22
23    split_in: SplitReceiver,
24    split_out: SplitSender,
25
26    to_controller: UnboundedSender<Result<Command>>,
27    now: Instant,
28    recv_context: ProtocolContext,
29    send_context: ProtocolContext,
30}
31
32impl Channel {
33    pub(crate) fn new(
34        remote_is_server: bool,
35        to_controller: UnboundedSender<Result<Command>>,
36    ) -> Self {
37        Self {
38            unreliable_out: VecDeque::new(),
39            reliable_in: ReliableReceiver::new(),
40            reliable_out: ReliableSender::new(),
41            split_in: SplitReceiver::new(),
42            split_out: SplitSender::new(),
43            to_controller,
44            now: Instant::now(),
45            recv_context: ProtocolContext::latest_for_receive(remote_is_server),
46            send_context: ProtocolContext::latest_for_send(remote_is_server),
47        }
48    }
49
50    pub(crate) fn update_now(&mut self, now: &Instant) {
51        self.now = *now;
52    }
53
54    pub(crate) fn update_context(
55        &mut self,
56        recv_context: ProtocolContext,
57        send_context: ProtocolContext,
58    ) {
59        self.recv_context = recv_context;
60        self.send_context = send_context;
61    }
62
63    /// Process a packet received from remote
64    /// Possibly dispatching one or more Commands
65    pub(crate) fn process(&mut self, body: PacketBody) -> Result<()> {
66        match body {
67            PacketBody::Reliable(rb) => self.process_reliable(rb)?,
68            PacketBody::Inner(ib) => self.process_inner(ib)?,
69        }
70        Ok(())
71    }
72
73    pub(crate) fn process_reliable(&mut self, body: ReliableBody) -> Result<()> {
74        self.reliable_in.push(body);
75        while let Some(inner) = self.reliable_in.pop() {
76            self.process_inner(inner)?;
77        }
78        Ok(())
79    }
80
81    pub(crate) fn process_inner(&mut self, body: InnerBody) -> Result<()> {
82        match body {
83            InnerBody::Control(body) => self.process_control(body),
84            InnerBody::Original(body) => {
85                if let Some(command) = body.command {
86                    self.process_command(command);
87                }
88            }
89            InnerBody::Split(body) => {
90                if let Some(payload) = self.split_in.push(self.now, body)? {
91                    let mut buf = Deserializer::new(self.recv_context, &payload);
92                    if let Some(command) = Command::deserialize(&mut buf)? {
93                        self.process_command(command);
94                    }
95                }
96            }
97        }
98        Ok(())
99    }
100
101    pub(crate) fn process_control(&mut self, body: ControlBody) {
102        match body {
103            ControlBody::Ack(ack) => {
104                self.reliable_out.process_ack(&ack);
105            }
106            // Everything else is handled one level up
107            _ => (),
108        }
109    }
110
111    pub(crate) fn process_command(&mut self, command: Command) {
112        match self.to_controller.send(Ok(command)) {
113            Ok(()) => (),
114            Err(error) => panic!("Unexpected command channel shutdown: {error:?}"),
115        }
116    }
117
118    /// Send command to remote
119    pub(crate) fn send(&mut self, reliable: bool, command: Command) -> Result<()> {
120        let bodies = self.split_out.push(self.send_context, command)?;
121        for body in bodies {
122            self.send_inner(reliable, body);
123        }
124        Ok(())
125    }
126
127    pub(crate) fn send_inner(&mut self, reliable: bool, body: InnerBody) {
128        if reliable {
129            self.reliable_out.push(body);
130        } else {
131            self.unreliable_out.push_back(body);
132        }
133    }
134
135    /// Check if the channel has anything ready to send.
136    pub(crate) fn next_send(&mut self, now: Instant) -> Option<PacketBody> {
137        if let Some(body) = self.unreliable_out.pop_front() {
138            return Some(PacketBody::Inner(body));
139        };
140        if let Some(body) = self.reliable_out.pop(now) {
141            return Some(body);
142        }
143        None
144    }
145
146    /// Only call after exhausting `next_send()`
147    pub(crate) fn next_timeout(&mut self) -> Option<Instant> {
148        self.reliable_out.next_timeout()
149    }
150}