dynamo_runtime/storage/kv/
file.rs1use std::cmp;
5use std::collections::HashSet;
6use std::ffi::OsString;
7use std::fmt;
8use std::fs;
9use std::fs::OpenOptions;
10use std::os::unix::ffi::OsStrExt;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15use std::time::SystemTime;
16use std::{collections::HashMap, pin::Pin};
17
18use anyhow::Context as _;
19use async_trait::async_trait;
20use futures::StreamExt;
21use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
22use parking_lot::Mutex;
23use tokio_util::sync::CancellationToken;
24
25use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
26
27const DEFAULT_TTL: Duration = Duration::from_secs(10);
30
31const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
33
34#[derive(Clone)]
36pub struct FileStore {
37 cancel_token: CancellationToken,
38 root: PathBuf,
39 connection_id: u64,
40 active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
43}
44
45impl FileStore {
46 pub(super) fn new<P: Into<PathBuf>>(cancel_token: CancellationToken, root_dir: P) -> Self {
47 let fs = FileStore {
48 cancel_token,
49 root: root_dir.into(),
50 connection_id: rand::random::<u64>(),
51 active_dirs: Arc::new(Mutex::new(HashMap::new())),
52 };
53 let c = fs.clone();
54 thread::spawn(move || c.expiry_thread());
55 fs
56 }
57
58 fn expiry_thread(&self) {
64 loop {
65 let ttl = self.shortest_ttl();
66 let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
67
68 if self.cancel_token.is_cancelled() {
70 break;
71 }
72
73 thread::sleep(keep_alive_interval);
74
75 if self.cancel_token.is_cancelled() {
76 break;
77 }
78
79 self.keep_alive();
80 if let Err(err) = self.delete_expired_files() {
81 tracing::error!(error = %err, "FileStore delete_expired_files");
82 }
83 }
84 }
85
86 fn shortest_ttl(&self) -> Duration {
88 let mut ttl = DEFAULT_TTL;
89 let active_dirs = self.active_dirs.lock().clone();
90 for (_, dir) in active_dirs {
91 ttl = cmp::min(ttl, dir.ttl);
92 }
93 tracing::trace!("FileStore expiry shortest ttl {ttl:?}");
94 ttl
95 }
96
97 fn keep_alive(&self) {
98 let active_dirs = self.active_dirs.lock().clone();
99 for (_, dir) in active_dirs {
100 dir.keep_alive();
101 }
102 }
103
104 fn delete_expired_files(&self) -> anyhow::Result<()> {
105 let active_dirs = self.active_dirs.lock().clone();
106 for (path, dir) in active_dirs {
107 dir.delete_expired_files()
108 .with_context(|| path.display().to_string())?;
109 }
110 Ok(())
111 }
112}
113
114#[async_trait]
115impl Store for FileStore {
116 type Bucket = Directory;
117
118 async fn get_or_create_bucket(
120 &self,
121 bucket_name: &str,
122 ttl: Option<Duration>,
123 ) -> Result<Self::Bucket, StoreError> {
124 let p = self.root.join(bucket_name);
125 if let Some(dir) = self.active_dirs.lock().get(&p) {
126 return Ok(dir.clone());
127 };
128
129 if p.exists() {
130 if !p.is_dir() {
132 return Err(StoreError::FilesystemError(
133 "Bucket name is not a directory".to_string(),
134 ));
135 }
136 } else {
137 fs::create_dir_all(&p).map_err(to_fs_err)?;
139 }
140 let dir = Directory::new(self.root.clone(), p.clone(), ttl.unwrap_or(DEFAULT_TTL));
141 self.active_dirs.lock().insert(p, dir.clone());
142 Ok(dir)
143 }
144
145 async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
147 let p = self.root.join(bucket_name);
148 if let Some(dir) = self.active_dirs.lock().get(&p) {
149 return Ok(Some(dir.clone()));
150 };
151
152 if !p.exists() {
153 return Ok(None);
154 }
155 if !p.is_dir() {
156 return Err(StoreError::FilesystemError(
157 "Bucket name is not a directory".to_string(),
158 ));
159 }
160 let dir = Directory::new(self.root.clone(), p.clone(), DEFAULT_TTL);
162 self.active_dirs.lock().insert(p, dir.clone());
163 Ok(Some(dir))
164 }
165
166 fn connection_id(&self) -> u64 {
167 self.connection_id
168 }
169
170 fn shutdown(&self) {
173 for (_, mut dir) in self.active_dirs.lock().drain() {
174 if let Err(err) = dir.delete_owned_files() {
175 tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
176 }
177 }
178 }
179}
180
181#[derive(Clone)]
182pub struct Directory {
183 root: PathBuf,
184 p: PathBuf,
185 ttl: Duration,
186 owned_files: Arc<Mutex<HashSet<PathBuf>>>,
188}
189
190impl Directory {
191 fn new(root: PathBuf, p: PathBuf, ttl: Duration) -> Self {
192 let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone());
194 if ttl < MIN_KEEP_ALIVE {
195 let h_ttl = humantime::format_duration(ttl);
196 tracing::warn!(path = %p.display(), ttl = %h_ttl, "ttl is too short, increasing to {}", humantime::format_duration(MIN_KEEP_ALIVE));
197 }
198 let ttl = cmp::max(ttl, MIN_KEEP_ALIVE);
199 Directory {
200 root: canonical_root,
201 p,
202 ttl,
203 owned_files: Arc::new(Mutex::new(HashSet::new())),
204 }
205 }
206
207 fn keep_alive(&self) {
209 let owned_files = self.owned_files.lock().clone();
210 for path in owned_files {
211 let file = match OpenOptions::new().write(true).open(&path) {
212 Ok(f) => f,
213 Err(err) => {
214 tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed opening owned file");
215 continue;
216 }
217 };
218 if let Err(err) = file.set_modified(SystemTime::now()) {
219 tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed set_modified on owned file");
220 continue;
221 }
222 tracing::trace!("FileStore keep_alive set {}", path.display());
223 }
224 }
225
226 fn delete_expired_files(&self) -> anyhow::Result<()> {
231 let deadline = SystemTime::now() - self.ttl;
232 let dirname = self.p.display().to_string();
233 for entry in fs::read_dir(&self.p).with_context(|| dirname.clone())? {
234 let entry = match entry {
235 Ok(p) => p,
236 Err(err) => {
237 tracing::warn!(dir = dirname, error = %err, "File store could read directory contents");
238 continue;
239 }
240 };
241 if !entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
242 tracing::warn!(dir = dirname, entry = %entry.path().display(), "File store directory should only contain files");
243 continue;
244 }
245 let ctx = entry.path().display().to_string();
246 let metadata = match entry.metadata() {
247 Ok(m) => m,
248 Err(err) => {
249 tracing::warn!(path = %ctx, error = %err, "Failed fetching metadata");
250 continue;
251 }
252 };
253 let last_modified = match metadata.modified() {
254 Ok(lm) => lm,
255 Err(err) => {
256 tracing::warn!(path = %ctx, error = %err, "Failed reading mtime");
259 continue;
260 }
261 };
262 if last_modified < deadline {
263 tracing::info!(path = ctx, ?last_modified, "Expired");
264 if let Err(err) = fs::remove_file(entry.path()) {
265 tracing::warn!(path = %ctx, error = %err, "Failed removing");
266 }
267 }
268 }
269 Ok(())
270 }
271
272 fn delete_owned_files(&mut self) -> anyhow::Result<()> {
273 let mut errs = Vec::new();
274 for p in self.owned_files.lock().drain() {
275 if let Err(err) = fs::remove_file(&p) {
276 errs.push(format!("{}: {err}", p.display()));
277 }
278 }
279 if !errs.is_empty() {
280 anyhow::bail!(errs.join(", "));
281 }
282 Ok(())
283 }
284}
285
286impl fmt::Display for Directory {
287 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
288 write!(f, "{}", self.p.display())
289 }
290}
291
292#[async_trait]
293impl Bucket for Directory {
294 async fn insert(
296 &self,
297 key: &Key,
298 value: bytes::Bytes,
299 _revision: u64, ) -> Result<StoreOutcome, StoreError> {
301 let safe_key = key.url_safe();
302 let full_path = self.p.join(safe_key.as_ref());
303 self.owned_files.lock().insert(full_path.clone());
304 let str_path = full_path.display().to_string();
305 fs::write(&full_path, &value)
306 .context(str_path)
307 .map_err(a_to_fs_err)?;
308 Ok(StoreOutcome::Created(0))
309 }
310
311 async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
313 let safe_key = key.url_safe();
314 let full_path = self.p.join(safe_key.as_ref());
315 if !full_path.exists() {
316 return Ok(None);
317 }
318 let str_path = full_path.display().to_string();
319 let data: bytes::Bytes = fs::read(&full_path)
320 .context(str_path)
321 .map_err(a_to_fs_err)?
322 .into();
323 Ok(Some(data))
324 }
325
326 async fn delete(&self, key: &Key) -> Result<(), StoreError> {
328 let safe_key = key.url_safe();
329 let full_path = self.p.join(safe_key.as_ref());
330 let str_path = full_path.display().to_string();
331 if !full_path.exists() {
332 return Err(StoreError::MissingKey(str_path));
333 }
334
335 self.owned_files.lock().remove(&full_path);
336
337 fs::remove_file(&full_path)
338 .context(str_path)
339 .map_err(a_to_fs_err)
340 }
341
342 async fn watch(
343 &self,
344 ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
345 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
346
347 let mut watcher = RecommendedWatcher::new(
348 move |res: Result<Event, notify::Error>| {
349 if let Err(err) = tx.blocking_send(res) {
350 tracing::error!(error = %err, "Failed to send file watch event");
351 }
352 },
353 Config::default(),
354 )
355 .map_err(to_fs_err)?;
356
357 watcher
358 .watch(&self.p, RecursiveMode::NonRecursive)
359 .map_err(to_fs_err)?;
360
361 let dir = self.p.clone();
362 let root = self.root.clone();
363
364 Ok(Box::pin(async_stream::stream! {
365 let _watcher = watcher;
367
368 while let Some(event_result) = rx.recv().await {
369 let event = match event_result {
370 Ok(event) => event,
371 Err(err) => {
372 tracing::error!(error = %err, "Failed receiving file watch event");
373 continue;
374 }
375 };
376
377 for item_path in event.paths {
378 if item_path == dir {
380 tracing::warn!("Unexpected event on the directory itself");
381 continue;
382 }
383
384 let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
387
388 let key = match canonical_item_path.strip_prefix(&root) {
389 Ok(stripped) => Key::from_url_safe(&stripped.display().to_string()),
390 Err(err) => {
391 tracing::error!(
394 error = %err,
395 item_path = %canonical_item_path.display(),
396 root = %root.display(),
397 "Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
398 continue;
399 }
400 };
401
402 match event.kind {
403 EventKind::Create(event::CreateKind::File) | EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content)) => {
404 let data: bytes::Bytes = match fs::read(&item_path) {
405 Ok(data) => data.into(),
406 Err(err) => {
407 tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
408 continue;
409 }
410 };
411 let item = KeyValue::new(key, data);
412 yield WatchEvent::Put(item);
413 }
414 EventKind::Remove(event::RemoveKind::File) => {
415 yield WatchEvent::Delete(key);
416 }
417 _ => {
418 continue;
420 }
421 }
422 }
423 }
424 }))
425 }
426
427 async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
428 let contents = fs::read_dir(&self.p)
429 .with_context(|| self.p.display().to_string())
430 .map_err(a_to_fs_err)?;
431 let mut out = HashMap::new();
432 for entry in contents {
433 let entry = entry.map_err(to_fs_err)?;
434 if !entry.path().is_file() {
435 tracing::warn!(
436 path = %entry.path().display(),
437 "Unexpected entry, directory should only contain files."
438 );
439 continue;
440 }
441
442 let canonical_entry_path = match entry.path().canonicalize() {
444 Ok(p) => p,
445 Err(err) => {
446 tracing::warn!(error = %err, path = %entry.path().display(), "Failed to canonicalize path. Using original path.");
447 entry.path()
448 }
449 };
450
451 let key = match canonical_entry_path.strip_prefix(&self.root) {
452 Ok(p) => Key::from_url_safe(&p.to_string_lossy()),
453 Err(err) => {
454 tracing::error!(
455 error = %err,
456 path = %canonical_entry_path.display(),
457 root = %self.root.display(),
458 "FileStore path not in root. Should be impossible. Skipping entry."
459 );
460 continue;
461 }
462 };
463 let data: bytes::Bytes = fs::read(entry.path())
464 .with_context(|| self.p.display().to_string())
465 .map_err(a_to_fs_err)?
466 .into();
467 out.insert(key, data);
468 }
469 Ok(out)
470 }
471}
472
473fn a_to_fs_err(err: anyhow::Error) -> StoreError {
475 StoreError::FilesystemError(format!("{err:#}"))
476}
477
478fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
479 StoreError::FilesystemError(err.to_string())
480}
481
482#[cfg(test)]
483mod tests {
484 use std::collections::HashSet;
485
486 use tokio_util::sync::CancellationToken;
487
488 use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
489
490 #[tokio::test]
491 async fn test_entries_full_path() {
492 let t = tempfile::tempdir().unwrap();
493
494 let cancel_token = CancellationToken::new();
495 let m = FileStore::new(cancel_token.clone(), t.path());
496 let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
497 let _ = bucket
498 .insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
499 .await
500 .unwrap();
501 let _ = bucket
502 .insert(&Key::new("key2".to_string()), "value2".into(), 0)
503 .await
504 .unwrap();
505 let entries = bucket.entries().await.unwrap();
506 let keys: HashSet<Key> = entries.into_keys().collect();
507 cancel_token.cancel(); assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
510 assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));
511 }
512}