dynamo_runtime/storage/key_value_store/
file.rs1use std::collections::HashSet;
5use std::ffi::OsString;
6use std::fmt;
7use std::fs;
8use std::os::unix::ffi::OsStrExt;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Duration;
12use std::{collections::HashMap, pin::Pin};
13
14use anyhow::Context as _;
15use async_trait::async_trait;
16use futures::StreamExt;
17use inotify::{Event, EventMask, EventStream, Inotify, WatchMask};
18use parking_lot::Mutex;
19
20use crate::storage::key_value_store::KeyValue;
21
22use super::{Key, KeyValueBucket, KeyValueStore, StoreError, StoreOutcome, WatchEvent};
23
24#[derive(Clone)]
26pub struct FileStore {
27 root: PathBuf,
28 connection_id: u64,
29 active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
32}
33
34impl FileStore {
35 pub(super) fn new<P: Into<PathBuf>>(root_dir: P) -> Self {
36 FileStore {
37 root: root_dir.into(),
38 connection_id: rand::random::<u64>(),
39 active_dirs: Arc::new(Mutex::new(HashMap::new())),
40 }
41 }
42}
43
44#[async_trait]
45impl KeyValueStore for FileStore {
46 type Bucket = Directory;
47
48 async fn get_or_create_bucket(
50 &self,
51 bucket_name: &str,
52 _ttl: Option<Duration>, ) -> Result<Self::Bucket, StoreError> {
54 let p = self.root.join(bucket_name);
55 if let Some(dir) = self.active_dirs.lock().get(&p) {
56 return Ok(dir.clone());
57 };
58
59 if p.exists() {
60 if !p.is_dir() {
62 return Err(StoreError::FilesystemError(
63 "Bucket name is not a directory".to_string(),
64 ));
65 }
66 } else {
67 fs::create_dir_all(&p).map_err(to_fs_err)?;
69 }
70 let dir = Directory::new(self.root.clone(), p.clone());
71 self.active_dirs.lock().insert(p, dir.clone());
72 Ok(dir)
73 }
74
75 async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
77 let p = self.root.join(bucket_name);
78 if let Some(dir) = self.active_dirs.lock().get(&p) {
79 return Ok(Some(dir.clone()));
80 };
81
82 if !p.exists() {
83 return Ok(None);
84 }
85 if !p.is_dir() {
86 return Err(StoreError::FilesystemError(
87 "Bucket name is not a directory".to_string(),
88 ));
89 }
90 let dir = Directory::new(self.root.clone(), p.clone());
91 self.active_dirs.lock().insert(p, dir.clone());
92 Ok(Some(dir))
93 }
94
95 fn connection_id(&self) -> u64 {
96 self.connection_id
97 }
98
99 fn shutdown(&self) {
102 for (_, mut dir) in self.active_dirs.lock().drain() {
103 if let Err(err) = dir.delete_owned_files() {
104 tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
105 }
106 }
107 }
108}
109
110#[derive(Clone)]
111pub struct Directory {
112 root: PathBuf,
113 p: PathBuf,
114 owned_files: Arc<Mutex<HashSet<PathBuf>>>,
116}
117
118impl Directory {
119 fn new(root: PathBuf, p: PathBuf) -> Self {
120 Directory {
121 root,
122 p,
123 owned_files: Arc::new(Mutex::new(HashSet::new())),
124 }
125 }
126
127 fn delete_owned_files(&mut self) -> anyhow::Result<()> {
128 let mut errs = Vec::new();
129 for p in self.owned_files.lock().drain() {
130 if let Err(err) = fs::remove_file(&p) {
131 errs.push(format!("{}: {err}", p.display()));
132 }
133 }
134 if !errs.is_empty() {
135 anyhow::bail!(errs.join(", "));
136 }
137 Ok(())
138 }
139}
140
141impl fmt::Display for Directory {
142 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143 write!(f, "{}", self.p.display())
144 }
145}
146
147#[async_trait]
148impl KeyValueBucket for Directory {
149 async fn insert(
151 &self,
152 key: &Key,
153 value: bytes::Bytes,
154 _revision: u64, ) -> Result<StoreOutcome, StoreError> {
156 let safe_key = Key::new(key.as_ref()); let full_path = self.p.join(safe_key.as_ref());
158 self.owned_files.lock().insert(full_path.clone());
159 let str_path = full_path.display().to_string();
160 fs::write(&full_path, &value)
161 .context(str_path)
162 .map_err(a_to_fs_err)?;
163 Ok(StoreOutcome::Created(0))
164 }
165
166 async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
168 let safe_key = Key::new(key.as_ref()); let full_path = self.p.join(safe_key.as_ref());
170 if !full_path.exists() {
171 return Ok(None);
172 }
173 let str_path = full_path.display().to_string();
174 let data: bytes::Bytes = fs::read(&full_path)
175 .context(str_path)
176 .map_err(a_to_fs_err)?
177 .into();
178 Ok(Some(data))
179 }
180
181 async fn delete(&self, key: &Key) -> Result<(), StoreError> {
183 let safe_key = Key::new(key.as_ref()); let full_path = self.p.join(safe_key.as_ref());
185 let str_path = full_path.display().to_string();
186 if !full_path.exists() {
187 return Err(StoreError::MissingKey(str_path));
188 }
189
190 self.owned_files.lock().remove(&full_path);
191
192 fs::remove_file(&full_path)
193 .context(str_path)
194 .map_err(a_to_fs_err)
195 }
196
197 async fn watch(
198 &self,
199 ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
200 let inotify = Inotify::init().map_err(to_fs_err)?;
201 inotify
202 .watches()
203 .add(
204 &self.p,
205 WatchMask::MODIFY | WatchMask::CREATE | WatchMask::DELETE,
206 )
207 .map_err(to_fs_err)?;
208
209 let dir = self.p.clone();
210 Ok(Box::pin(async_stream::stream! {
211 let mut buffer = [0; 1024];
212 let mut events = match inotify.into_event_stream(&mut buffer) {
213 Ok(events) => events,
214 Err(err) => {
215 tracing::error!(error = %err, "Failed getting event stream from inotify");
216 return;
217 }
218 };
219 while let Some(Ok(event)) = events.next().await {
220 let Some(name) = event.name else {
221 tracing::warn!("Unexpected event on the directory itself");
222 continue;
223 };
224 let item_path = dir.join(name);
225 let key = match item_path.strip_prefix(&self.root) {
226 Ok(stripped) => stripped.display().to_string().replace("_", "/"),
227 Err(err) => {
228 tracing::error!(
231 error = %err,
232 item_path = %item_path.display(),
233 root = %self.root.display(),
234 "Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
235 continue;
236 }
237 };
238
239 match event.mask {
240 EventMask::MODIFY | EventMask::CREATE => {
241 let data: bytes::Bytes = match fs::read(&item_path) {
242 Ok(data) => data.into(),
243 Err(err) => {
244 tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
245 continue;
246 }
247 };
248 let item = KeyValue::new(key, data);
249 yield WatchEvent::Put(item);
250 }
251 EventMask::DELETE => {
252 yield WatchEvent::Delete(Key::from_raw(key));
253 }
254 event_type => {
255 tracing::warn!(?event_type, dir = %dir.display(), "Unexpected event type");
256 continue;
257 }
258 }
259 }
260 }))
261 }
262
263 async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError> {
264 let contents = fs::read_dir(&self.p)
265 .with_context(|| self.p.display().to_string())
266 .map_err(a_to_fs_err)?;
267 let mut out = HashMap::new();
268 for entry in contents {
269 let entry = entry.map_err(to_fs_err)?;
270 if !entry.path().is_file() {
271 tracing::warn!(
272 path = %entry.path().display(),
273 "Unexpected entry, directory should only contain files."
274 );
275 continue;
276 }
277
278 let key = match entry.path().strip_prefix(&self.root) {
279 Ok(p) => p.to_string_lossy().to_string().replace("_", "/"),
280 Err(err) => {
281 tracing::error!(
282 error = %err,
283 path = %entry.path().display(),
284 root = %self.root.display(),
285 "FileStore path not in root. Should be impossible. Skipping entry."
286 );
287 continue;
288 }
289 };
290 let data: bytes::Bytes = fs::read(entry.path())
291 .with_context(|| self.p.display().to_string())
292 .map_err(a_to_fs_err)?
293 .into();
294 out.insert(key, data);
295 }
296 Ok(out)
297 }
298}
299
300fn a_to_fs_err(err: anyhow::Error) -> StoreError {
302 StoreError::FilesystemError(format!("{err:#}"))
303}
304
305fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
306 StoreError::FilesystemError(err.to_string())
307}