dmds_tokio_fs/
lib.rs

1//! Tokio async filesystem implementation of `dmds` [`IoHandle`].
2
3#![warn(missing_docs)]
4
5use std::{
6    path::{Path, PathBuf},
7    pin::Pin,
8    sync::Arc,
9    time::Duration,
10};
11
12use bytes::{BufMut, BytesMut};
13use dashmap::DashSet;
14use dmds::IoHandle;
15use tokio::{
16    fs::File,
17    io::{AsyncReadExt, BufReader},
18};
19
20#[cfg(test)]
21mod tests;
22
23/// File system handler.
24#[derive(Debug)]
25pub struct FsHandle {
26    /// Root path of the database location.
27    root: PathBuf,
28    /// Whether the file path should be flat.
29    flat: bool,
30
31    /// Set of chunk positions that is not available.
32    invalid_chunks: DashSet<Box<[usize]>>,
33}
34
35impl IoHandle for FsHandle {
36    type Read<'a> = FsReader<'a> where Self: 'a;
37
38    #[inline]
39    fn hint_is_valid(&self, pos: &[usize]) -> bool {
40        !self.invalid_chunks.contains(pos)
41    }
42
43    async fn read_chunk<const DIMS: usize>(
44        &self,
45        pos: [usize; DIMS],
46    ) -> std::io::Result<(u32, Self::Read<'_>)> {
47        let path = self.path(&pos);
48        let result = File::open(&path).await;
49
50        if let Err(ref err) = result {
51            if err.kind() == std::io::ErrorKind::NotFound {
52                if self.invalid_chunks.len() > Self::IC_CAPACITY {
53                    self.invalid_chunks.clear();
54                }
55                self.invalid_chunks.insert(Box::new(pos));
56            }
57        }
58        let mut file = BufReader::new(result?);
59        let mut buf = [0_u8; 4];
60        file.read_exact(&mut buf).await?;
61
62        Ok((
63            u32::from_be_bytes(buf),
64            FsReader {
65                _handle: self,
66                file,
67            },
68        ))
69    }
70}
71
72impl FsHandle {
73    const IC_CAPACITY: usize = 1024 * 10;
74
75    /// Create a new file system handler.
76    pub fn new<P: AsRef<Path>>(root: P, flat: bool) -> Self {
77        Self {
78            root: root.as_ref().to_owned(),
79            flat,
80            invalid_chunks: DashSet::new(),
81        }
82    }
83
84    /// Write a chunk to the file system.
85    pub async fn write_chunk<const DIMS: usize, T: dmds::Data>(
86        &self,
87        chunk: &dmds::Chunk<T, DIMS>,
88    ) -> std::io::Result<()> {
89        let mut buf = BytesMut::new();
90        buf.put_u32(T::VERSION);
91        chunk.write_buf(&mut buf).await?;
92        let buf = buf.freeze();
93
94        let path = self.path(chunk.pos());
95        let mut path_pop = path.clone();
96        path_pop.pop();
97        let path_pop = path_pop;
98        if tokio::fs::read_dir(&path_pop).await.is_err() {
99            tokio::fs::create_dir_all(path_pop).await?;
100        }
101        let mut file = File::create(path).await?;
102        tokio::io::AsyncWriteExt::write_all(&mut file, &buf).await?;
103        tokio::io::AsyncWriteExt::flush(&mut file).await?;
104        file.sync_all().await?;
105        self.invalid_chunks.remove(&chunk.pos()[..]);
106        Ok(())
107    }
108
109    fn path(&self, pos: &[usize]) -> PathBuf {
110        if self.flat {
111            let mut chunk = String::new();
112            for dim in pos {
113                chunk.push_str(&dim.to_string());
114                chunk.push('_');
115            }
116            chunk.pop();
117
118            let mut buf = self.root.to_owned();
119            buf.push(chunk);
120            buf
121        } else {
122            let mut buf = self.root.to_owned();
123            for dim in pos {
124                buf.push(dim.to_string());
125            }
126            buf
127        }
128    }
129}
130
131/// Reader for the file system.
132#[derive(Debug)]
133pub struct FsReader<'a> {
134    _handle: &'a FsHandle,
135    file: BufReader<File>,
136}
137
138impl futures::AsyncRead for FsReader<'_> {
139    fn poll_read(
140        self: std::pin::Pin<&mut Self>,
141        cx: &mut std::task::Context<'_>,
142        buf: &mut [u8],
143    ) -> std::task::Poll<std::io::Result<usize>> {
144        let mut buf = tokio::io::ReadBuf::new(buf);
145        futures::ready!(tokio::io::AsyncRead::poll_read(
146            Pin::new(&mut self.get_mut().file),
147            cx,
148            &mut buf
149        ))?;
150        std::task::Poll::Ready(Ok(buf.filled().len()))
151    }
152}
153
154/// Shutdown handler for the world.
155///
156/// This handler will write all the dirty chunks to the file system
157/// in a blocking thread when dropped.
158#[derive(Debug)]
159pub struct ShutdownHandle<T: dmds::Data, const DIMS: usize>
160where
161    T: Send + 'static,
162{
163    world: Arc<dmds::World<T, DIMS, FsHandle>>,
164}
165
166impl<T: dmds::Data + Send + 'static, const DIMS: usize> ShutdownHandle<T, DIMS> {
167    /// Create a new shutdown handler.
168    #[inline]
169    pub fn new(world: Arc<dmds::World<T, DIMS, FsHandle>>) -> Self {
170        Self { world }
171    }
172}
173
174impl<T: dmds::Data + Send + 'static, const DIMS: usize> Drop for ShutdownHandle<T, DIMS> {
175    fn drop(&mut self) {
176        let world = self.world.clone();
177        let join = std::thread::spawn(move || {
178            let iter = world.chunks().filter_map(|chunk| {
179                if chunk.writes() > 0 {
180                    let chunk = chunk.value().clone();
181                    let world = world.clone();
182                    Some(async move {
183                        let chunk = chunk;
184                        world.io_handle().write_chunk(&chunk).await.unwrap();
185                    })
186                } else {
187                    None
188                }
189            });
190
191            let rt = tokio::runtime::Builder::new_current_thread()
192                .enable_all()
193                .build()
194                .unwrap();
195            rt.block_on(futures::future::join_all(iter));
196        });
197
198        let _result = join.join();
199    }
200}
201
202/// Runs the daemon.
203///
204/// This function will write all dirty chunk buffers to the file system.
205/// A [`ShutdownHandle`] will be created to write all the dirty chunks
206/// when the daemon is ended.
207pub async fn daemon<T, const DIMS: usize>(
208    world: Arc<dmds::World<T, DIMS, FsHandle>>,
209    write_interval: Duration,
210) where
211    T: dmds::Data + Send + 'static,
212{
213    const LEAST_WRITES: usize = 1;
214
215    let _handle = ShutdownHandle::new(world.clone());
216
217    loop {
218        tokio::time::sleep(write_interval).await;
219
220        let iter = world.chunks().filter_map(|chunk| {
221            if chunk.writes() >= LEAST_WRITES {
222                let chunk = chunk.value().clone();
223                let world = world.clone();
224                Some(async move {
225                    let chunk = chunk;
226                    let _ = world.io_handle().write_chunk(&chunk).await;
227                })
228            } else {
229                None
230            }
231        });
232
233        futures::future::join_all(iter).await;
234    }
235}