growthring/
lib.rs

1//! Simple and modular write-ahead-logging implementation.
2//!
3//! # Examples
4//!
5//! ```
6//! use growthring::{WALStoreAIO, wal::WALLoader};
7//! use futures::executor::block_on;
8//! let mut loader = WALLoader::new();
9//! loader.file_nbit(9).block_nbit(8);
10//!
11//!
12//! // Start with empty WAL (truncate = true).
13//! let store = WALStoreAIO::new("./walfiles", true, None, None).unwrap();
14//! let mut wal = block_on(loader.load(store, |_, _| {Ok(())}, 0)).unwrap();
15//! // Write a vector of records to WAL.
16//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
17//!     let ring_id = block_on(f).unwrap().1;
18//!     println!("WAL recorded record to {:?}", ring_id);
19//! }
20//!
21//!
22//! // Load from WAL (truncate = false).
23//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
24//! let mut wal = block_on(loader.load(store, |payload, ringid| {
25//!     // redo the operations in your application
26//!     println!("recover(payload={}, ringid={:?})",
27//!              std::str::from_utf8(&payload).unwrap(),
28//!              ringid);
29//!     Ok(())
30//! }, 0)).unwrap();
31//! // We saw some log playback, even there is no failure.
32//! // Let's try to grow the WAL to create many files.
33//! let ring_ids = wal.grow((1..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
34//!                   .into_iter().map(|f| block_on(f).unwrap().1).collect::<Vec<_>>();
35//! // Then assume all these records are not longer needed. We can tell WALWriter by the `peel`
36//! // method.
37//! block_on(wal.peel(ring_ids, 0)).unwrap();
38//! // There will only be one remaining file in ./walfiles.
39//!
40//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
41//! let wal = block_on(loader.load(store, |payload, _| {
42//!     println!("payload.len() = {}", payload.len());
43//!     Ok(())
44//! }, 0)).unwrap();
45//! // After each recovery, the ./walfiles is empty.
46//! ```
47
48#[macro_use] extern crate scan_fmt;
49pub mod wal;
50
51use aiofut::{AIOBuilder, AIOManager};
52use async_trait::async_trait;
53use libc::off_t;
54use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
55use nix::sys::stat::Mode;
56use nix::unistd::{ftruncate, mkdir, unlinkat, UnlinkatFlags};
57use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd};
58use std::sync::Arc;
59use wal::{WALBytes, WALFile, WALPos, WALStore};
60
61pub struct WALFileAIO {
62    fd: OwnedFd,
63    aiomgr: Arc<AIOManager>,
64}
65
66impl WALFileAIO {
67    pub fn new(
68        rootfd: BorrowedFd, filename: &str, aiomgr: Arc<AIOManager>,
69    ) -> Result<Self, ()> {
70        openat(
71            rootfd.as_raw_fd(),
72            filename,
73            OFlag::O_CREAT | OFlag::O_RDWR,
74            Mode::S_IRUSR | Mode::S_IWUSR,
75        )
76        .and_then(|fd| {
77            Ok(WALFileAIO {
78                fd: unsafe { OwnedFd::from_raw_fd(fd) },
79                aiomgr,
80            })
81        })
82        .or_else(|_| Err(()))
83    }
84}
85
86#[async_trait(?Send)]
87impl WALFile for WALFileAIO {
88    async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
89        // TODO: is there any async version of fallocate?
90        fallocate(
91            self.fd.as_raw_fd(),
92            FallocateFlags::FALLOC_FL_ZERO_RANGE,
93            offset as off_t,
94            length as off_t,
95        )
96        .and_then(|_| Ok(()))
97        .or_else(|_| Err(()))
98    }
99
100    fn truncate(&self, length: usize) -> Result<(), ()> {
101        ftruncate(&self.fd, length as off_t).or_else(|_| Err(()))
102    }
103
104    async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
105        let (res, data) = self
106            .aiomgr
107            .write(self.fd.as_raw_fd(), offset, data, None)
108            .await;
109        res.or_else(|_| Err(())).and_then(|nwrote| {
110            if nwrote == data.len() {
111                Ok(())
112            } else {
113                Err(())
114            }
115        })
116    }
117
118    async fn read(
119        &self, offset: WALPos, length: usize,
120    ) -> Result<Option<WALBytes>, ()> {
121        let (res, data) = self
122            .aiomgr
123            .read(self.fd.as_raw_fd(), offset, length, None)
124            .await;
125        res.or_else(|_| Err(())).and_then(|nread| {
126            Ok(if nread == length { Some(data) } else { None })
127        })
128    }
129}
130
131pub struct WALStoreAIO {
132    rootfd: OwnedFd,
133    aiomgr: Arc<AIOManager>,
134}
135
136unsafe impl Send for WALStoreAIO {}
137
138impl WALStoreAIO {
139    pub fn new(
140        wal_dir: &str, truncate: bool, rootfd: Option<BorrowedFd>,
141        aiomgr: Option<AIOManager>,
142    ) -> Result<Self, ()> {
143        let aiomgr = Arc::new(aiomgr.ok_or(Err(())).or_else(
144            |_: Result<AIOManager, ()>| {
145                AIOBuilder::default().build().or(Err(()))
146            },
147        )?);
148
149        if truncate {
150            let _ = std::fs::remove_dir_all(wal_dir);
151        }
152        let walfd;
153        match rootfd {
154            None => {
155                match mkdir(
156                    wal_dir,
157                    Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR,
158                ) {
159                    Err(e) => {
160                        if truncate {
161                            panic!("error while creating directory: {}", e)
162                        }
163                    }
164                    Ok(_) => (),
165                }
166                walfd = match open(
167                    wal_dir,
168                    OFlag::O_DIRECTORY | OFlag::O_PATH,
169                    Mode::empty(),
170                ) {
171                    Ok(fd) => fd,
172                    Err(_) => panic!("error while opening the WAL directory"),
173                }
174            }
175            Some(fd) => {
176                let dirstr = std::ffi::CString::new(wal_dir).unwrap();
177                let ret = unsafe {
178                    libc::mkdirat(
179                        fd.as_raw_fd(),
180                        dirstr.as_ptr(),
181                        libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR,
182                    )
183                };
184                if ret != 0 {
185                    if truncate {
186                        panic!("error while creating directory")
187                    }
188                }
189                walfd = match nix::fcntl::openat(
190                    fd.as_raw_fd(),
191                    wal_dir,
192                    OFlag::O_DIRECTORY | OFlag::O_PATH,
193                    Mode::empty(),
194                ) {
195                    Ok(fd) => fd,
196                    Err(_) => panic!("error while opening the WAL directory"),
197                }
198            }
199        }
200        Ok(WALStoreAIO {
201            rootfd: unsafe { OwnedFd::from_raw_fd(walfd) },
202            aiomgr,
203        })
204    }
205}
206
207#[async_trait(?Send)]
208impl WALStore for WALStoreAIO {
209    type FileNameIter = std::vec::IntoIter<String>;
210
211    async fn open_file(
212        &self, filename: &str, _touch: bool,
213    ) -> Result<Box<dyn WALFile>, ()> {
214        let filename = filename.to_string();
215        WALFileAIO::new(self.rootfd.as_fd(), &filename, self.aiomgr.clone())
216            .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
217    }
218
219    async fn remove_file(&self, filename: String) -> Result<(), ()> {
220        unlinkat(
221            Some(self.rootfd.as_raw_fd()),
222            filename.as_str(),
223            UnlinkatFlags::NoRemoveDir,
224        )
225        .or_else(|_| Err(()))
226    }
227
228    fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
229        let mut logfiles = Vec::new();
230        for ent in nix::dir::Dir::openat(
231            self.rootfd.as_raw_fd(),
232            "./",
233            OFlag::empty(),
234            Mode::empty(),
235        )
236        .unwrap()
237        .iter()
238        {
239            logfiles
240                .push(ent.unwrap().file_name().to_str().unwrap().to_string())
241        }
242        Ok(logfiles.into_iter())
243    }
244}