1#[macro_use] extern crate scan_fmt;
49pub mod wal;
50
51use aiofut::{AIOBuilder, AIOManager};
52use async_trait::async_trait;
53use libc::off_t;
54use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
55use nix::sys::stat::Mode;
56use nix::unistd::{ftruncate, mkdir, unlinkat, UnlinkatFlags};
57use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd};
58use std::sync::Arc;
59use wal::{WALBytes, WALFile, WALPos, WALStore};
60
61pub struct WALFileAIO {
62 fd: OwnedFd,
63 aiomgr: Arc<AIOManager>,
64}
65
66impl WALFileAIO {
67 pub fn new(
68 rootfd: BorrowedFd, filename: &str, aiomgr: Arc<AIOManager>,
69 ) -> Result<Self, ()> {
70 openat(
71 rootfd.as_raw_fd(),
72 filename,
73 OFlag::O_CREAT | OFlag::O_RDWR,
74 Mode::S_IRUSR | Mode::S_IWUSR,
75 )
76 .and_then(|fd| {
77 Ok(WALFileAIO {
78 fd: unsafe { OwnedFd::from_raw_fd(fd) },
79 aiomgr,
80 })
81 })
82 .or_else(|_| Err(()))
83 }
84}
85
86#[async_trait(?Send)]
87impl WALFile for WALFileAIO {
88 async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
89 fallocate(
91 self.fd.as_raw_fd(),
92 FallocateFlags::FALLOC_FL_ZERO_RANGE,
93 offset as off_t,
94 length as off_t,
95 )
96 .and_then(|_| Ok(()))
97 .or_else(|_| Err(()))
98 }
99
100 fn truncate(&self, length: usize) -> Result<(), ()> {
101 ftruncate(&self.fd, length as off_t).or_else(|_| Err(()))
102 }
103
104 async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
105 let (res, data) = self
106 .aiomgr
107 .write(self.fd.as_raw_fd(), offset, data, None)
108 .await;
109 res.or_else(|_| Err(())).and_then(|nwrote| {
110 if nwrote == data.len() {
111 Ok(())
112 } else {
113 Err(())
114 }
115 })
116 }
117
118 async fn read(
119 &self, offset: WALPos, length: usize,
120 ) -> Result<Option<WALBytes>, ()> {
121 let (res, data) = self
122 .aiomgr
123 .read(self.fd.as_raw_fd(), offset, length, None)
124 .await;
125 res.or_else(|_| Err(())).and_then(|nread| {
126 Ok(if nread == length { Some(data) } else { None })
127 })
128 }
129}
130
131pub struct WALStoreAIO {
132 rootfd: OwnedFd,
133 aiomgr: Arc<AIOManager>,
134}
135
136unsafe impl Send for WALStoreAIO {}
137
138impl WALStoreAIO {
139 pub fn new(
140 wal_dir: &str, truncate: bool, rootfd: Option<BorrowedFd>,
141 aiomgr: Option<AIOManager>,
142 ) -> Result<Self, ()> {
143 let aiomgr = Arc::new(aiomgr.ok_or(Err(())).or_else(
144 |_: Result<AIOManager, ()>| {
145 AIOBuilder::default().build().or(Err(()))
146 },
147 )?);
148
149 if truncate {
150 let _ = std::fs::remove_dir_all(wal_dir);
151 }
152 let walfd;
153 match rootfd {
154 None => {
155 match mkdir(
156 wal_dir,
157 Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR,
158 ) {
159 Err(e) => {
160 if truncate {
161 panic!("error while creating directory: {}", e)
162 }
163 }
164 Ok(_) => (),
165 }
166 walfd = match open(
167 wal_dir,
168 OFlag::O_DIRECTORY | OFlag::O_PATH,
169 Mode::empty(),
170 ) {
171 Ok(fd) => fd,
172 Err(_) => panic!("error while opening the WAL directory"),
173 }
174 }
175 Some(fd) => {
176 let dirstr = std::ffi::CString::new(wal_dir).unwrap();
177 let ret = unsafe {
178 libc::mkdirat(
179 fd.as_raw_fd(),
180 dirstr.as_ptr(),
181 libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR,
182 )
183 };
184 if ret != 0 {
185 if truncate {
186 panic!("error while creating directory")
187 }
188 }
189 walfd = match nix::fcntl::openat(
190 fd.as_raw_fd(),
191 wal_dir,
192 OFlag::O_DIRECTORY | OFlag::O_PATH,
193 Mode::empty(),
194 ) {
195 Ok(fd) => fd,
196 Err(_) => panic!("error while opening the WAL directory"),
197 }
198 }
199 }
200 Ok(WALStoreAIO {
201 rootfd: unsafe { OwnedFd::from_raw_fd(walfd) },
202 aiomgr,
203 })
204 }
205}
206
207#[async_trait(?Send)]
208impl WALStore for WALStoreAIO {
209 type FileNameIter = std::vec::IntoIter<String>;
210
211 async fn open_file(
212 &self, filename: &str, _touch: bool,
213 ) -> Result<Box<dyn WALFile>, ()> {
214 let filename = filename.to_string();
215 WALFileAIO::new(self.rootfd.as_fd(), &filename, self.aiomgr.clone())
216 .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
217 }
218
219 async fn remove_file(&self, filename: String) -> Result<(), ()> {
220 unlinkat(
221 Some(self.rootfd.as_raw_fd()),
222 filename.as_str(),
223 UnlinkatFlags::NoRemoveDir,
224 )
225 .or_else(|_| Err(()))
226 }
227
228 fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
229 let mut logfiles = Vec::new();
230 for ent in nix::dir::Dir::openat(
231 self.rootfd.as_raw_fd(),
232 "./",
233 OFlag::empty(),
234 Mode::empty(),
235 )
236 .unwrap()
237 .iter()
238 {
239 logfiles
240 .push(ent.unwrap().file_name().to_str().unwrap().to_string())
241 }
242 Ok(logfiles.into_iter())
243 }
244}