1use std::thread::Builder as ThreadBuilder;
2use std::time::{Instant, Duration};
3use std::path::{PathBuf, Path};
4use std::collections::hash_map::Entry;
5use std::collections::HashMap;
6
7use crossbeam_channel::{Sender, Receiver, RecvTimeoutError, unbounded, bounded};
8
9use crate::world::source::{LevelSource, LevelSourceError, ChunkLoadRequest, ProtoChunk, ChunkSaveRequest};
10use crate::util::TimedCache;
11use crate::debug;
12
13use super::region::{RegionFile, RegionResult, RegionError, calc_region_pos};
14use super::decode::{decode_chunk_from_reader};
15use super::encode::{encode_chunk_to_writer};
16
17
18enum Request {
19 Load(ChunkLoadRequest),
20 Save(ChunkSaveRequest)
21}
22
23
24pub struct AnvilLevelSource {
28 request_sender: Sender<Request>,
29 result_receiver: Receiver<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>>
30}
31
32impl AnvilLevelSource {
33
34 pub fn new<P: AsRef<Path>>(dir: P) -> Self {
35
36 let (
37 request_sender,
38 request_receiver
39 ) = unbounded();
40
41 let result_receiver = Worker::new(
42 dir.as_ref().join("region"),
43 request_receiver
44 );
45
46 Self {
47 request_sender,
48 result_receiver
49 }
50
51 }
52
53}
54
55impl LevelSource for AnvilLevelSource {
56
57 fn request_chunk_load(&mut self, req: ChunkLoadRequest) -> Result<(), (LevelSourceError, ChunkLoadRequest)> {
58 self.request_sender.send(Request::Load(req)).unwrap();
60 Ok(())
61 }
62
63 fn poll_chunk(&mut self) -> Option<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>> {
64 self.result_receiver.try_recv().ok()
65 }
66
67 fn request_chunk_save(&mut self, req: ChunkSaveRequest) -> Result<(), LevelSourceError> {
68 self.request_sender.send(Request::Save(req)).unwrap();
69 Ok(())
70 }
71
72}
73
74
75const REGIONS_CACHE_TIME: Duration = Duration::from_secs(60);
76const REGIONS_REQUEST_RECV_TIMEOUT: Duration = Duration::from_secs(30);
77
78struct Worker {
79 regions_dir: PathBuf,
80 request_receiver: Receiver<Request>,
81 result_sender: Sender<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>>,
82 regions: HashMap<(i32, i32), TimedCache<RegionFile>>,
83 last_cache_check: Instant
84}
85
86impl Worker {
87
88 fn new(
90 regions_dir: PathBuf,
91 request_receiver: Receiver<Request>
92 ) -> Receiver<Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)>> {
93
94 let (
95 result_sender,
96 result_receiver
97 ) = bounded(128);
98
99 let worker = Self {
100 regions_dir,
101 request_receiver,
102 result_sender,
103 regions: HashMap::new(),
104 last_cache_check: Instant::now()
105 };
106
107 ThreadBuilder::new()
108 .name("Anvil level source worker".into())
109 .spawn(move || worker.run())
110 .expect("Failed to create anvil level source worker thread.");
111
112 result_receiver
113
114 }
115
116 fn run(mut self) {
117
118 loop {
119
120 match self.request_receiver.recv_timeout(REGIONS_REQUEST_RECV_TIMEOUT) {
121 Ok(Request::Load(req)) => {
122 let chunk = self.load_chunk(req);
124 if let Err(_) = self.result_sender.send(chunk) {
125 break
127 }
128 }
129 Ok(Request::Save(req)) => {
130 debug!("Received chunk save request for {}/{}", req.cx, req.cz);
131 self.save_chunk(req);
132 }
133 Err(RecvTimeoutError::Timeout) => {},
134 Err(RecvTimeoutError::Disconnected) => break
135 }
136
137 self.check_cache();
138
139 }
140
141 }
142
143 fn access_region(&mut self, rx: i32, rz: i32, create: bool) -> RegionResult<&mut TimedCache<RegionFile>> {
144 match self.regions.entry((rx, rz)) {
145 Entry::Occupied(o) => Ok(o.into_mut().cache_update()),
146 Entry::Vacant(v) => {
147 let region = RegionFile::new(self.regions_dir.clone(), rx, rz, create)?;
149 debug!("Region file opened at {}/{}", rx, rz);
150 Ok(v.insert(TimedCache::new(region, REGIONS_CACHE_TIME)))
151 }
152 }
153 }
154
155 fn load_chunk(&mut self, req: ChunkLoadRequest) -> Result<ProtoChunk, (LevelSourceError, ChunkLoadRequest)> {
156
157 let (rx, rz) = calc_region_pos(req.cx, req.cz);
158 let region = match self.access_region(rx, rz, false) {
159 Ok(region) => region,
160 Err(RegionError::FileNotFound(_)) => return Err((LevelSourceError::UnsupportedChunkPosition, req)),
161 Err(e) => return Err((LevelSourceError::new_custom(e), req))
162 };
163
164 let mut reader = match region.get_chunk_reader(req.cx, req.cz) {
165 Ok(reader) => reader,
166 Err(RegionError::EmptyChunk) => return Err((LevelSourceError::UnsupportedChunkPosition, req)),
169 Err(err) => return Err((LevelSourceError::new_custom(err), req))
170 };
171
172 let mut chunk = req.build_proto_chunk();
173
174 match decode_chunk_from_reader(&mut reader, &mut chunk) {
175 Ok(_) => Ok(chunk),
176 Err(err) => Err((LevelSourceError::new_custom(err), req))
177 }
178
179 }
180
181 fn save_chunk(&mut self, req: ChunkSaveRequest) {
182
183 let chunk = req.chunk.read().unwrap();
184 let (cx, cz) = chunk.get_position();
185 let (rx, rz) = calc_region_pos(cx, cz);
186
187 let region = match self.access_region(rx, rz, true) {
188 Ok(region) => region,
189 Err(_) => return
190 };
191
192 let mut writer = region.get_chunk_writer(cx, cz, Default::default());
193 encode_chunk_to_writer(&mut writer, &*chunk);
194 writer.write_chunk().unwrap();
195 }
198
199 fn check_cache(&mut self) {
200 if self.last_cache_check.elapsed() >= REGIONS_CACHE_TIME {
201 self.regions.retain(|(rx, rz), region| {
202 if region.is_cache_timed_out() {
203 debug!("Region file timed out at {}/{}", rx, rz);
204 false
205 } else {
206 true
207 }
208 });
209 self.last_cache_check = Instant::now();
210 }
211 }
212
213}