luanti_protocol/peer/
channel.rs1use std::{collections::VecDeque, time::Instant};
2
3use anyhow::Result;
4use tokio::sync::mpsc::UnboundedSender;
5
6use crate::{
7 commands::Command,
8 types::ProtocolContext,
9 wire::{
10 deser::{Deserialize, Deserializer},
11 packet::{ControlBody, InnerBody, PacketBody, ReliableBody},
12 },
13};
14
15use super::{ReliableReceiver, ReliableSender, SplitReceiver, SplitSender};
16
17pub(crate) struct Channel {
18 unreliable_out: VecDeque<InnerBody>,
19
20 reliable_in: ReliableReceiver,
21 reliable_out: ReliableSender,
22
23 split_in: SplitReceiver,
24 split_out: SplitSender,
25
26 to_controller: UnboundedSender<Result<Command>>,
27 now: Instant,
28 recv_context: ProtocolContext,
29 send_context: ProtocolContext,
30}
31
32impl Channel {
33 pub(crate) fn new(
34 remote_is_server: bool,
35 to_controller: UnboundedSender<Result<Command>>,
36 ) -> Self {
37 Self {
38 unreliable_out: VecDeque::new(),
39 reliable_in: ReliableReceiver::new(),
40 reliable_out: ReliableSender::new(),
41 split_in: SplitReceiver::new(),
42 split_out: SplitSender::new(),
43 to_controller,
44 now: Instant::now(),
45 recv_context: ProtocolContext::latest_for_receive(remote_is_server),
46 send_context: ProtocolContext::latest_for_send(remote_is_server),
47 }
48 }
49
50 pub(crate) fn update_now(&mut self, now: &Instant) {
51 self.now = *now;
52 }
53
54 pub(crate) fn update_context(
55 &mut self,
56 recv_context: ProtocolContext,
57 send_context: ProtocolContext,
58 ) {
59 self.recv_context = recv_context;
60 self.send_context = send_context;
61 }
62
63 pub(crate) fn process(&mut self, body: PacketBody) -> Result<()> {
66 match body {
67 PacketBody::Reliable(rb) => self.process_reliable(rb)?,
68 PacketBody::Inner(ib) => self.process_inner(ib)?,
69 }
70 Ok(())
71 }
72
73 pub(crate) fn process_reliable(&mut self, body: ReliableBody) -> Result<()> {
74 self.reliable_in.push(body);
75 while let Some(inner) = self.reliable_in.pop() {
76 self.process_inner(inner)?;
77 }
78 Ok(())
79 }
80
81 pub(crate) fn process_inner(&mut self, body: InnerBody) -> Result<()> {
82 match body {
83 InnerBody::Control(body) => self.process_control(body),
84 InnerBody::Original(body) => {
85 if let Some(command) = body.command {
86 self.process_command(command);
87 }
88 }
89 InnerBody::Split(body) => {
90 if let Some(payload) = self.split_in.push(self.now, body)? {
91 let mut buf = Deserializer::new(self.recv_context, &payload);
92 if let Some(command) = Command::deserialize(&mut buf)? {
93 self.process_command(command);
94 }
95 }
96 }
97 }
98 Ok(())
99 }
100
101 pub(crate) fn process_control(&mut self, body: ControlBody) {
102 match body {
103 ControlBody::Ack(ack) => {
104 self.reliable_out.process_ack(&ack);
105 }
106 _ => (),
108 }
109 }
110
111 pub(crate) fn process_command(&mut self, command: Command) {
112 match self.to_controller.send(Ok(command)) {
113 Ok(()) => (),
114 Err(error) => panic!("Unexpected command channel shutdown: {error:?}"),
115 }
116 }
117
118 pub(crate) fn send(&mut self, reliable: bool, command: Command) -> Result<()> {
120 let bodies = self.split_out.push(self.send_context, command)?;
121 for body in bodies {
122 self.send_inner(reliable, body);
123 }
124 Ok(())
125 }
126
127 pub(crate) fn send_inner(&mut self, reliable: bool, body: InnerBody) {
128 if reliable {
129 self.reliable_out.push(body);
130 } else {
131 self.unreliable_out.push_back(body);
132 }
133 }
134
135 pub(crate) fn next_send(&mut self, now: Instant) -> Option<PacketBody> {
137 if let Some(body) = self.unreliable_out.pop_front() {
138 return Some(PacketBody::Inner(body));
139 };
140 if let Some(body) = self.reliable_out.pop(now) {
141 return Some(body);
142 }
143 None
144 }
145
146 pub(crate) fn next_timeout(&mut self) -> Option<Instant> {
148 self.reliable_out.next_timeout()
149 }
150}