mc_core/world/anvil/
source.rs

1use std::thread::Builder as ThreadBuilder;
2use std::time::{Instant, Duration};
3use std::path::{PathBuf, Path};
4use std::collections::hash_map::Entry;
5use std::collections::HashMap;
6
7use crossbeam_channel::{Sender, Receiver, RecvTimeoutError, unbounded, bounded};
8
9use crate::world::source::{LevelSource, LevelSourceError, ChunkLoadRequest, ProtoChunk, ChunkSaveRequest};
10use crate::util::TimedCache;
11use crate::debug;
12
13use super::region::{RegionFile, RegionResult, RegionError, calc_region_pos};
14use super::decode::{decode_chunk_from_reader};
15use super::encode::{encode_chunk_to_writer};
16
17
18enum Request {
19    Load(ChunkLoadRequest),
20    Save(ChunkSaveRequest)
21}
22
23
24/// A level source that load chunks from anvil region files. This source internally use
25/// a threaded worker to avoid disk access durations overhead. Each opened region file
26/// remains opened for `REGIONS_CACHE_TIME` duration.
27pub struct AnvilLevelSource {
28    request_sender: Sender<Request>,
29    result_receiver: Receiver<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>>
30}
31
32impl AnvilLevelSource {
33
34    pub fn new<P: AsRef<Path>>(dir: P) -> Self {
35
36        let (
37            request_sender,
38            request_receiver
39        ) = unbounded();
40
41        let result_receiver = Worker::new(
42            dir.as_ref().join("region"),
43            request_receiver
44        );
45
46        Self {
47            request_sender,
48            result_receiver
49        }
50
51    }
52
53}
54
55impl LevelSource for AnvilLevelSource {
56
57    fn request_chunk_load(&mut self, req: ChunkLoadRequest) -> Result<(), (LevelSourceError, ChunkLoadRequest)> {
58        // SAFETY: Unwrap should be safe because the channel is unbounded.
59        self.request_sender.send(Request::Load(req)).unwrap();
60        Ok(())
61    }
62
63    fn poll_chunk(&mut self) -> Option<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>> {
64        self.result_receiver.try_recv().ok()
65    }
66
67    fn request_chunk_save(&mut self, req: ChunkSaveRequest) -> Result<(), LevelSourceError> {
68        self.request_sender.send(Request::Save(req)).unwrap();
69        Ok(())
70    }
71
72}
73
74
75const REGIONS_CACHE_TIME: Duration = Duration::from_secs(60);
76const REGIONS_REQUEST_RECV_TIMEOUT: Duration = Duration::from_secs(30);
77
78struct Worker {
79    regions_dir: PathBuf,
80    request_receiver: Receiver<Request>,
81    result_sender: Sender<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>>,
82    regions: HashMap<(i32, i32), TimedCache<RegionFile>>,
83    last_cache_check: Instant
84}
85
86impl Worker {
87
88    /// Internal constructor for worker, you must give the regions directory, not level directory.
89    fn new(
90        regions_dir: PathBuf,
91        request_receiver: Receiver<Request>
92    ) -> Receiver<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>> {
93
94        let (
95            result_sender,
96            result_receiver
97        ) = bounded(128);
98
99        let worker = Self {
100            regions_dir,
101            request_receiver,
102            result_sender,
103            regions: HashMap::new(),
104            last_cache_check: Instant::now()
105        };
106
107        ThreadBuilder::new()
108            .name("Anvil level source worker".into())
109            .spawn(move || worker.run())
110            .expect("Failed to create anvil level source worker thread.");
111
112        result_receiver
113
114    }
115
116    fn run(mut self) {
117
118        loop {
119
120            match self.request_receiver.recv_timeout(REGIONS_REQUEST_RECV_TIMEOUT) {
121                Ok(Request::Load(req)) => {
122                    // debug!("Received chunk load request for {}/{}", req.cx, req.cz);
123                    let chunk = self.load_chunk(req);
124                    if let Err(_) = self.result_sender.send(chunk) {
125                        // debug!("Failed to send result.");
126                        break
127                    }
128                }
129                Ok(Request::Save(req)) => {
130                    debug!("Received chunk save request for {}/{}", req.cx, req.cz);
131                    self.save_chunk(req);
132                }
133                Err(RecvTimeoutError::Timeout) => {},
134                Err(RecvTimeoutError::Disconnected) => break
135            }
136
137            self.check_cache();
138
139        }
140
141    }
142
143    fn access_region(&mut self, rx: i32, rz: i32, create: bool) -> RegionResult<&mut TimedCache<RegionFile>> {
144        match self.regions.entry((rx, rz)) {
145            Entry::Occupied(o) => Ok(o.into_mut().cache_update()),
146            Entry::Vacant(v) => {
147                // debug!("Try opening region file at {}/{}", rx, rz);
148                let region = RegionFile::new(self.regions_dir.clone(), rx, rz, create)?;
149                debug!("Region file opened at {}/{}", rx, rz);
150                Ok(v.insert(TimedCache::new(region, REGIONS_CACHE_TIME)))
151            }
152        }
153    }
154
155    fn load_chunk(&mut self, req: ChunkLoadRequest) -> Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)> {
156
157        let (rx, rz) = calc_region_pos(req.cx, req.cz);
158        let region = match self.access_region(rx, rz, false) {
159            Ok(region) => region,
160            Err(RegionError::FileNotFound(_)) => return Err((LevelSourceError::UnsupportedChunkPosition, req)),
161            Err(e) => return Err((LevelSourceError::new_custom(e), req))
162        };
163
164        let mut reader = match region.get_chunk_reader(req.cx, req.cz) {
165            Ok(reader) => reader,
166            // If the chunk is empty, just return an unsupported chunk pos error, this is used to
167            // delegate to the generator in case of LoadOrGen source.
168            Err(RegionError::EmptyChunk) => return Err((LevelSourceError::UnsupportedChunkPosition, req)),
169            Err(err) => return Err((LevelSourceError::new_custom(err), req))
170        };
171
172        let mut chunk = req.build_proto_chunk();
173
174        match decode_chunk_from_reader(&mut reader, &mut chunk) {
175            Ok(_) => Ok(chunk),
176            Err(err) => Err((LevelSourceError::new_custom(err), req))
177        }
178
179    }
180
181    fn save_chunk(&mut self, req: ChunkSaveRequest) {
182
183        let chunk = req.chunk.read().unwrap();
184        let (cx, cz) = chunk.get_position();
185        let (rx, rz) = calc_region_pos(cx, cz);
186
187        let region = match self.access_region(rx, rz, true) {
188            Ok(region) => region,
189            Err(_) => return
190        };
191
192        let mut writer = region.get_chunk_writer(cx, cz, Default::default());
193        encode_chunk_to_writer(&mut writer, &*chunk);
194        writer.write_chunk().unwrap();
195        // debug!("Chunk at {}/{} saved", cx, cz);
196
197    }
198
199    fn check_cache(&mut self) {
200        if self.last_cache_check.elapsed() >= REGIONS_CACHE_TIME {
201            self.regions.retain(|(rx, rz), region| {
202                if region.is_cache_timed_out() {
203                    debug!("Region file timed out at {}/{}", rx, rz);
204                    false
205                } else {
206                    true
207                }
208            });
209            self.last_cache_check = Instant::now();
210        }
211    }
212
213}