1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
use std::{
fs,
sync::{Arc, Mutex},
};
use crossbeam_channel::{select, Receiver, RecvError, Sender};
use jod_thread::JoinHandle;
use memofs::{IoResultExt, Vfs, VfsEvent};
use rbx_dom_weak::types::{Ref, Variant};
use crate::{
message_queue::MessageQueue,
snapshot::{
apply_patch_set, compute_patch_set, AppliedPatchSet, InstigatingSource, PatchSet, RojoTree,
},
snapshot_middleware::{snapshot_from_vfs, snapshot_project_node},
};
/// Processes file change events, updates the DOM, and sends those updates
/// through a channel for other stuff to consume.
///
/// Owns the connection between Rojo's VFS and its DOM by holding onto another
/// thread that processes messages.
///
/// Consumers of ChangeProcessor, like ServeSession, are intended to communicate
/// with this object via channels.
///
/// ChangeProcessor expects to be the only writer to the RojoTree and Vfs
/// objects passed to it.
pub struct ChangeProcessor {
/// Controls the runtime of the processor thread. When signaled, the job
/// thread will finish its current work and terminate.
///
/// This channel should be signaled before dropping ChangeProcessor or we'll
/// hang forever waiting for the message processing loop to terminate.
shutdown_sender: Sender<()>,
/// A handle to the message processing thread. When dropped, we'll block
/// until it's done.
///
/// Allowed to be unused because dropping this value has side effects.
#[allow(unused)]
job_thread: JoinHandle<Result<(), RecvError>>,
}
impl ChangeProcessor {
/// Spin up the ChangeProcessor, connecting it to the given tree, VFS, and
/// outbound message queue.
pub fn start(
tree: Arc<Mutex<RojoTree>>,
vfs: Arc<Vfs>,
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
tree_mutation_receiver: Receiver<PatchSet>,
) -> Self {
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1);
let vfs_receiver = vfs.event_receiver();
let task = JobThreadContext {
tree,
vfs,
message_queue,
};
let job_thread = jod_thread::Builder::new()
.name("ChangeProcessor thread".to_owned())
.spawn(move || {
log::trace!("ChangeProcessor thread started");
loop {
select! {
recv(vfs_receiver) -> event => {
task.handle_vfs_event(event?);
},
recv(tree_mutation_receiver) -> patch_set => {
task.handle_tree_event(patch_set?);
},
recv(shutdown_receiver) -> _ => {
log::trace!("ChangeProcessor shutdown signal received...");
return Ok(());
},
}
}
})
.expect("Could not start ChangeProcessor thread");
Self {
shutdown_sender,
job_thread,
}
}
}
impl Drop for ChangeProcessor {
fn drop(&mut self) {
// Signal the job thread to start spinning down. Without this we'll hang
// forever waiting for the thread to finish its infinite loop.
let _ = self.shutdown_sender.send(());
// After this function ends, the job thread will be joined. It might
// block for a small period of time while it processes its last work.
}
}
/// Contains all of the state needed to synchronize the DOM and VFS.
struct JobThreadContext {
/// A handle to the DOM we're managing.
tree: Arc<Mutex<RojoTree>>,
/// A handle to the VFS we're managing.
vfs: Arc<Vfs>,
/// Whenever changes are applied to the DOM, we should push those changes
/// into this message queue to inform any connected clients.
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
}
impl JobThreadContext {
fn handle_vfs_event(&self, event: VfsEvent) {
log::trace!("Vfs event: {:?}", event);
// Update the VFS immediately with the event.
self.vfs
.commit_event(&event)
.expect("Error applying VFS change");
// For a given VFS event, we might have many changes to different parts
// of the tree. Calculate and apply all of these changes.
let applied_patches = match event {
VfsEvent::Create(path) | VfsEvent::Remove(path) | VfsEvent::Write(path) => {
let mut tree = self.tree.lock().unwrap();
let mut applied_patches = Vec::new();
// Find the nearest ancestor to this path that has
// associated instances in the tree. This helps make sure
// that we handle additions correctly, especially if we
// receive events for descendants of a large tree being
// created all at once.
let mut current_path = path.as_path();
let affected_ids = loop {
let ids = tree.get_ids_at_path(current_path);
log::trace!("Path {} affects IDs {:?}", current_path.display(), ids);
if !ids.is_empty() {
break ids.to_vec();
}
log::trace!("Trying parent path...");
match current_path.parent() {
Some(parent) => current_path = parent,
None => break Vec::new(),
}
};
for id in affected_ids {
if let Some(patch) = compute_and_apply_changes(&mut tree, &self.vfs, id) {
if !patch.is_empty() {
applied_patches.push(patch);
}
}
}
applied_patches
}
_ => {
log::warn!("Unhandled VFS event: {:?}", event);
Vec::new()
}
};
// Notify anyone listening to the message queue about the changes we
// just made.
self.message_queue.push_messages(&applied_patches);
}
fn handle_tree_event(&self, patch_set: PatchSet) {
log::trace!("Applying PatchSet from client: {:#?}", patch_set);
let applied_patch = {
let mut tree = self.tree.lock().unwrap();
for &id in &patch_set.removed_instances {
if let Some(instance) = tree.get_instance(id) {
if let Some(instigating_source) = &instance.metadata().instigating_source {
match instigating_source {
InstigatingSource::Path(path) => fs::remove_file(path).unwrap(),
InstigatingSource::ProjectNode(_, _, _, _) => {
log::warn!(
"Cannot remove instance {:?}, it's from a project file",
id
);
}
}
} else {
// TODO
log::warn!(
"Cannot remove instance {:?}, it is not an instigating source.",
id
);
}
} else {
log::warn!("Cannot remove instance {:?}, it does not exist.", id);
}
}
for update in &patch_set.updated_instances {
let id = update.id;
if let Some(instance) = tree.get_instance(id) {
if update.changed_name.is_some() {
log::warn!("Cannot rename instances yet.");
}
if update.changed_class_name.is_some() {
log::warn!("Cannot change ClassName yet.");
}
if update.changed_metadata.is_some() {
log::warn!("Cannot change metadata yet.");
}
for (key, changed_value) in &update.changed_properties {
if key == "Source" {
if let Some(instigating_source) =
&instance.metadata().instigating_source
{
match instigating_source {
InstigatingSource::Path(path) => {
if let Some(Variant::String(value)) = changed_value {
fs::write(path, value).unwrap();
} else {
log::warn!("Cannot change Source to non-string value.");
}
}
InstigatingSource::ProjectNode(_, _, _, _) => {
log::warn!(
"Cannot remove instance {:?}, it's from a project file",
id
);
}
}
} else {
log::warn!(
"Cannot update instance {:?}, it is not an instigating source.",
id
);
}
} else {
log::warn!("Cannot change properties besides BaseScript.Source.");
}
}
} else {
log::warn!("Cannot update instance {:?}, it does not exist.", id);
}
}
apply_patch_set(&mut tree, patch_set)
};
if !applied_patch.is_empty() {
self.message_queue.push_messages(&[applied_patch]);
}
}
}
fn compute_and_apply_changes(tree: &mut RojoTree, vfs: &Vfs, id: Ref) -> Option<AppliedPatchSet> {
let metadata = tree
.get_metadata(id)
.expect("metadata missing for instance present in tree");
let instigating_source = match &metadata.instigating_source {
Some(path) => path,
None => {
log::error!(
"Instance {:?} did not have an instigating source, but was considered for an update.",
id
);
log::error!("This is a bug. Please file an issue!");
return None;
}
};
// How we process a file change event depends on what created this
// file/folder in the first place.
let applied_patch_set = match instigating_source {
InstigatingSource::Path(path) => match vfs.metadata(path).with_not_found() {
Ok(Some(_)) => {
// Our instance was previously created from a path and that
// path still exists. We can generate a snapshot starting at
// that path and use it as the source for our patch.
let snapshot = match snapshot_from_vfs(&metadata.context, vfs, path) {
Ok(snapshot) => snapshot,
Err(err) => {
log::error!("Snapshot error: {:?}", err);
return None;
}
};
let patch_set = compute_patch_set(snapshot, tree, id);
apply_patch_set(tree, patch_set)
}
Ok(None) => {
// Our instance was previously created from a path, but that
// path no longer exists.
//
// We associate deleting the instigating file for an
// instance with deleting that instance.
let mut patch_set = PatchSet::new();
patch_set.removed_instances.push(id);
apply_patch_set(tree, patch_set)
}
Err(err) => {
log::error!("Error processing filesystem change: {:?}", err);
return None;
}
},
InstigatingSource::ProjectNode(project_path, instance_name, project_node, parent_class) => {
// This instance is the direct subject of a project node. Since
// there might be information associated with our instance from
// the project file, we snapshot the entire project node again.
let snapshot_result = snapshot_project_node(
&metadata.context,
project_path,
instance_name,
project_node,
vfs,
parent_class.as_ref().map(|name| name.as_str()),
);
let snapshot = match snapshot_result {
Ok(snapshot) => snapshot,
Err(err) => {
log::error!("{:?}", err);
return None;
}
};
let patch_set = compute_patch_set(snapshot, tree, id);
apply_patch_set(tree, patch_set)
}
};
Some(applied_patch_set)
}