1#![warn(missing_docs)]
4
5use std::{
6 path::{Path, PathBuf},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use bytes::{BufMut, BytesMut};
13use dashmap::DashSet;
14use dmds::IoHandle;
15use tokio::{
16 fs::File,
17 io::{AsyncReadExt, BufReader},
18};
19
20#[cfg(test)]
21mod tests;
22
23#[derive(Debug)]
25pub struct FsHandle {
26 root: PathBuf,
28 flat: bool,
30
31 invalid_chunks: DashSet<Box<[usize]>>,
33}
34
35impl IoHandle for FsHandle {
36 type Read<'a> = FsReader<'a> where Self: 'a;
37
38 #[inline]
39 fn hint_is_valid(&self, pos: &[usize]) -> bool {
40 !self.invalid_chunks.contains(pos)
41 }
42
43 async fn read_chunk<const DIMS: usize>(
44 &self,
45 pos: [usize; DIMS],
46 ) -> std::io::Result<(u32, Self::Read<'_>)> {
47 let path = self.path(&pos);
48 let result = File::open(&path).await;
49
50 if let Err(ref err) = result {
51 if err.kind() == std::io::ErrorKind::NotFound {
52 if self.invalid_chunks.len() > Self::IC_CAPACITY {
53 self.invalid_chunks.clear();
54 }
55 self.invalid_chunks.insert(Box::new(pos));
56 }
57 }
58 let mut file = BufReader::new(result?);
59 let mut buf = [0_u8; 4];
60 file.read_exact(&mut buf).await?;
61
62 Ok((
63 u32::from_be_bytes(buf),
64 FsReader {
65 _handle: self,
66 file,
67 },
68 ))
69 }
70}
71
72impl FsHandle {
73 const IC_CAPACITY: usize = 1024 * 10;
74
75 pub fn new<P: AsRef<Path>>(root: P, flat: bool) -> Self {
77 Self {
78 root: root.as_ref().to_owned(),
79 flat,
80 invalid_chunks: DashSet::new(),
81 }
82 }
83
84 pub async fn write_chunk<const DIMS: usize, T: dmds::Data>(
86 &self,
87 chunk: &dmds::Chunk<T, DIMS>,
88 ) -> std::io::Result<()> {
89 let mut buf = BytesMut::new();
90 buf.put_u32(T::VERSION);
91 chunk.write_buf(&mut buf).await?;
92 let buf = buf.freeze();
93
94 let path = self.path(chunk.pos());
95 let mut path_pop = path.clone();
96 path_pop.pop();
97 let path_pop = path_pop;
98 if tokio::fs::read_dir(&path_pop).await.is_err() {
99 tokio::fs::create_dir_all(path_pop).await?;
100 }
101 let mut file = File::create(path).await?;
102 tokio::io::AsyncWriteExt::write_all(&mut file, &buf).await?;
103 tokio::io::AsyncWriteExt::flush(&mut file).await?;
104 file.sync_all().await?;
105 self.invalid_chunks.remove(&chunk.pos()[..]);
106 Ok(())
107 }
108
109 fn path(&self, pos: &[usize]) -> PathBuf {
110 if self.flat {
111 let mut chunk = String::new();
112 for dim in pos {
113 chunk.push_str(&dim.to_string());
114 chunk.push('_');
115 }
116 chunk.pop();
117
118 let mut buf = self.root.to_owned();
119 buf.push(chunk);
120 buf
121 } else {
122 let mut buf = self.root.to_owned();
123 for dim in pos {
124 buf.push(dim.to_string());
125 }
126 buf
127 }
128 }
129}
130
131#[derive(Debug)]
133pub struct FsReader<'a> {
134 _handle: &'a FsHandle,
135 file: BufReader<File>,
136}
137
138impl futures::AsyncRead for FsReader<'_> {
139 fn poll_read(
140 self: std::pin::Pin<&mut Self>,
141 cx: &mut std::task::Context<'_>,
142 buf: &mut [u8],
143 ) -> std::task::Poll<std::io::Result<usize>> {
144 let mut buf = tokio::io::ReadBuf::new(buf);
145 futures::ready!(tokio::io::AsyncRead::poll_read(
146 Pin::new(&mut self.get_mut().file),
147 cx,
148 &mut buf
149 ))?;
150 std::task::Poll::Ready(Ok(buf.filled().len()))
151 }
152}
153
154#[derive(Debug)]
159pub struct ShutdownHandle<T: dmds::Data, const DIMS: usize>
160where
161 T: Send + 'static,
162{
163 world: Arc<dmds::World<T, DIMS, FsHandle>>,
164}
165
166impl<T: dmds::Data + Send + 'static, const DIMS: usize> ShutdownHandle<T, DIMS> {
167 #[inline]
169 pub fn new(world: Arc<dmds::World<T, DIMS, FsHandle>>) -> Self {
170 Self { world }
171 }
172}
173
174impl<T: dmds::Data + Send + 'static, const DIMS: usize> Drop for ShutdownHandle<T, DIMS> {
175 fn drop(&mut self) {
176 let world = self.world.clone();
177 let join = std::thread::spawn(move || {
178 let iter = world.chunks().filter_map(|chunk| {
179 if chunk.writes() > 0 {
180 let chunk = chunk.value().clone();
181 let world = world.clone();
182 Some(async move {
183 let chunk = chunk;
184 world.io_handle().write_chunk(&chunk).await.unwrap();
185 })
186 } else {
187 None
188 }
189 });
190
191 let rt = tokio::runtime::Builder::new_current_thread()
192 .enable_all()
193 .build()
194 .unwrap();
195 rt.block_on(futures::future::join_all(iter));
196 });
197
198 let _result = join.join();
199 }
200}
201
202pub async fn daemon<T, const DIMS: usize>(
208 world: Arc<dmds::World<T, DIMS, FsHandle>>,
209 write_interval: Duration,
210) where
211 T: dmds::Data + Send + 'static,
212{
213 const LEAST_WRITES: usize = 1;
214
215 let _handle = ShutdownHandle::new(world.clone());
216
217 loop {
218 tokio::time::sleep(write_interval).await;
219
220 let iter = world.chunks().filter_map(|chunk| {
221 if chunk.writes() >= LEAST_WRITES {
222 let chunk = chunk.value().clone();
223 let world = world.clone();
224 Some(async move {
225 let chunk = chunk;
226 let _ = world.io_handle().write_chunk(&chunk).await;
227 })
228 } else {
229 None
230 }
231 });
232
233 futures::future::join_all(iter).await;
234 }
235}