1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use amethyst::ecs::{Entities, System};
use crossbeam_channel::Sender;
use std::io;
use std::net::UdpSocket;
use std::str;
use crate::types::{ComponentMap, EntityMessage, IncomingComponent, IncomingMessage, ResourceMap};
/// The system in charge of reading and dispatching incoming messages from
/// the editor.
pub struct EditorReceiverSystem {
socket: UdpSocket,
// Map containing channels used to send incoming serialized component/resource data from the
// editor. Incoming data is sent to specialized systems that deserialize the data and update
// the corresponding local data.
component_map: ComponentMap,
resource_map: ResourceMap,
entity_handler: Sender<EntityMessage>,
incoming_buffer: Vec<u8>,
}
impl EditorReceiverSystem {
pub fn new(
component_map: ComponentMap,
resource_map: ResourceMap,
entity_handler: Sender<EntityMessage>,
socket: UdpSocket,
) -> EditorReceiverSystem {
// Create the socket used for communicating with the editor.
//
// NOTE: We set the socket to nonblocking so that we don't block if there are no incoming
// messages to read. We `expect` on the call to `set_nonblocking` because the game will
// hang if the socket is still set to block when the game runs.
EditorReceiverSystem {
socket,
component_map,
resource_map,
entity_handler,
incoming_buffer: Vec::with_capacity(1024),
}
}
}
impl<'a> System<'a> for EditorReceiverSystem {
type SystemData = Entities<'a>;
fn run(&mut self, entities: Self::SystemData) {
let editor_address = ([127, 0, 0, 1], 8000).into();
// Read any incoming messages from the editor process.
let mut buf = [0; 1024];
loop {
// TODO: Verify that the incoming address matches the editor process address.
let (bytes_read, addr) = match self.socket.recv_from(&mut buf[..]) {
Ok(res) => res,
Err(error) => {
match error.kind() {
// If the read would block, it means that there was no incoming data and we
// should break from the loop.
io::ErrorKind::WouldBlock => break,
// This is an "error" that happens on Windows if no editor is running to
// receive the state update we just sent. The OS gives a "connection was
// forcibly closed" error when no socket receives the message, but we
// don't care if that happens (in fact, we use UDP specifically so that
// we can broadcast messages without worrying about establishing a
// connection).
io::ErrorKind::ConnectionReset => continue,
// All other error kinds should be indicative of a genuine error. For our
// purposes we still want to ignore them, but we'll at least log a warning
// in case it helps debug an issue.
_ => {
warn!("Error reading incoming: {:?}", error);
continue;
}
}
}
};
if addr != editor_address {
trace!("Packet received from unknown address {:?}", addr);
continue;
}
debug!("Packet: {:?}", &buf[..bytes_read]);
// Add the bytes from the incoming packet to the buffer.
self.incoming_buffer.extend_from_slice(&buf[..bytes_read]);
}
// Check the incoming buffer to see if any completed messages have been received.
while let Some(index) = self.incoming_buffer.iter().position(|&byte| byte == 0xC) {
// HACK: Manually introduce a scope here so that the compiler can tell when we're done
// using borrowing the message bytes from `self.incoming_buffer`. This can be removed
// once NLL is stable.
{
let message_bytes = &self.incoming_buffer[..index];
let result = str::from_utf8(message_bytes)
.ok()
.and_then(|message| serde_json::from_str(message).ok());
debug!("Message str: {:?}", result);
if let Some(message) = result {
debug!("Message: {:#?}", message);
match message {
IncomingMessage::ComponentUpdate {
id,
entity: entity_data,
data,
} => {
let entity = entities.entity(entity_data.id);
// Skip the update if the entity is no longer valid.
if entity.gen().id() != entity_data.generation {
debug!(
"Entity {:?} had invalid generation {} (expected {})",
entity_data,
entity_data.generation,
entity.gen().id()
);
continue;
}
if let Some(sender) = self.component_map.get(&*id) {
// TODO: Should we do something to prevent this from blocking?
sender
.send(IncomingComponent { entity, data })
.expect("Disconnected from component system");
} else {
debug!("No deserializer found for component {:?}", id);
}
}
IncomingMessage::ResourceUpdate { id, data } => {
// TODO: Should we do something if there was no deserialer system for the
// specified ID?
if let Some(sender) = self.resource_map.get(&*id) {
// TODO: Should we do something to prevent this from blocking?
sender
.send(data)
.expect("Disconnected from resource system");
}
}
IncomingMessage::CreateEntities { amount } => {
self.entity_handler
.send(EntityMessage::Create(amount))
.expect("Disconnected from entity handler system");
}
IncomingMessage::DestroyEntities { entities } => {
self.entity_handler
.send(EntityMessage::Destroy(
entities.iter().map(|e| e.id).collect(),
))
.expect("Disconnected from entity handler system");
}
}
}
}
// Remove the message bytes from the beginning of the incoming buffer.
self.incoming_buffer.drain(..=index);
}
}
}