dynamo_runtime/storage/kv/
file.rs1use std::cmp;
5use std::collections::HashSet;
6use std::ffi::OsString;
7use std::fmt;
8use std::fs;
9use std::fs::OpenOptions;
10use std::os::unix::ffi::OsStrExt;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15use std::time::SystemTime;
16use std::{collections::HashMap, pin::Pin};
17
18use anyhow::Context as _;
19use async_trait::async_trait;
20use futures::StreamExt;
21use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
22use parking_lot::Mutex;
23use tokio_util::sync::CancellationToken;
24
25use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
26
27const DEFAULT_TTL: Duration = Duration::from_secs(10);
30
31const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
33
34const TEMP_FILE_PREFIX: &str = ".tmp_";
37
38#[derive(Clone)]
40pub struct FileStore {
41 cancel_token: CancellationToken,
42 root: PathBuf,
43 connection_id: u64,
44 active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
47}
48
49impl FileStore {
50 pub(super) fn new<P: Into<PathBuf>>(cancel_token: CancellationToken, root_dir: P) -> Self {
51 let fs = FileStore {
52 cancel_token,
53 root: root_dir.into(),
54 connection_id: rand::random::<u64>(),
55 active_dirs: Arc::new(Mutex::new(HashMap::new())),
56 };
57 let c = fs.clone();
58 thread::spawn(move || c.expiry_thread());
59 fs
60 }
61
62 fn expiry_thread(&self) {
68 loop {
69 let ttl = self.shortest_ttl();
70 let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
71
72 if self.cancel_token.is_cancelled() {
74 break;
75 }
76
77 thread::sleep(keep_alive_interval);
78
79 if self.cancel_token.is_cancelled() {
80 break;
81 }
82
83 self.keep_alive();
84 if let Err(err) = self.delete_expired_files() {
85 tracing::error!(error = %err, "FileStore delete_expired_files");
86 }
87 }
88 }
89
90 fn shortest_ttl(&self) -> Duration {
92 let mut ttl = DEFAULT_TTL;
93 let active_dirs = self.active_dirs.lock().clone();
94 for (_, dir) in active_dirs {
95 ttl = cmp::min(ttl, dir.ttl);
96 }
97 tracing::trace!("FileStore expiry shortest ttl {ttl:?}");
98 ttl
99 }
100
101 fn keep_alive(&self) {
102 let active_dirs = self.active_dirs.lock().clone();
103 for (_, dir) in active_dirs {
104 dir.keep_alive();
105 }
106 }
107
108 fn delete_expired_files(&self) -> anyhow::Result<()> {
109 let active_dirs = self.active_dirs.lock().clone();
110 for (path, dir) in active_dirs {
111 dir.delete_expired_files()
112 .with_context(|| path.display().to_string())?;
113 }
114 Ok(())
115 }
116}
117
118#[async_trait]
119impl Store for FileStore {
120 type Bucket = Directory;
121
122 async fn get_or_create_bucket(
124 &self,
125 bucket_name: &str,
126 ttl: Option<Duration>,
127 ) -> Result<Self::Bucket, StoreError> {
128 let p = self.root.join(bucket_name);
129 if let Some(dir) = self.active_dirs.lock().get(&p) {
130 return Ok(dir.clone());
131 };
132
133 if p.exists() {
134 if !p.is_dir() {
136 return Err(StoreError::FilesystemError(
137 "Bucket name is not a directory".to_string(),
138 ));
139 }
140 } else {
141 fs::create_dir_all(&p).map_err(to_fs_err)?;
143 }
144 let dir = Directory::new(self.root.clone(), p.clone(), ttl.unwrap_or(DEFAULT_TTL));
145 self.active_dirs.lock().insert(p, dir.clone());
146 Ok(dir)
147 }
148
149 async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
151 let p = self.root.join(bucket_name);
152 if let Some(dir) = self.active_dirs.lock().get(&p) {
153 return Ok(Some(dir.clone()));
154 };
155
156 if !p.exists() {
157 return Ok(None);
158 }
159 if !p.is_dir() {
160 return Err(StoreError::FilesystemError(
161 "Bucket name is not a directory".to_string(),
162 ));
163 }
164 let dir = Directory::new(self.root.clone(), p.clone(), DEFAULT_TTL);
166 self.active_dirs.lock().insert(p, dir.clone());
167 Ok(Some(dir))
168 }
169
170 fn connection_id(&self) -> u64 {
171 self.connection_id
172 }
173
174 fn shutdown(&self) {
177 for (_, mut dir) in self.active_dirs.lock().drain() {
178 if let Err(err) = dir.delete_owned_files() {
179 tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
180 }
181 }
182 }
183}
184
185#[derive(Clone)]
186pub struct Directory {
187 root: PathBuf,
188 p: PathBuf,
189 ttl: Duration,
190 owned_files: Arc<Mutex<HashSet<PathBuf>>>,
192}
193
194impl Directory {
195 fn new(root: PathBuf, p: PathBuf, ttl: Duration) -> Self {
196 let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone());
198 if ttl < MIN_KEEP_ALIVE {
199 let h_ttl = humantime::format_duration(ttl);
200 tracing::warn!(path = %p.display(), ttl = %h_ttl, "ttl is too short, increasing to {}", humantime::format_duration(MIN_KEEP_ALIVE));
201 }
202 let ttl = cmp::max(ttl, MIN_KEEP_ALIVE);
203 Directory {
204 root: canonical_root,
205 p,
206 ttl,
207 owned_files: Arc::new(Mutex::new(HashSet::new())),
208 }
209 }
210
211 fn keep_alive(&self) {
213 let owned_files = self.owned_files.lock().clone();
214 for path in owned_files {
215 let file = match OpenOptions::new().write(true).open(&path) {
216 Ok(f) => f,
217 Err(err) => {
218 tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed opening owned file");
219 continue;
220 }
221 };
222 if let Err(err) = file.set_modified(SystemTime::now()) {
223 tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed set_modified on owned file");
224 continue;
225 }
226 tracing::trace!("FileStore keep_alive set {}", path.display());
227 }
228 }
229
230 fn delete_expired_files(&self) -> anyhow::Result<()> {
235 let deadline = SystemTime::now() - self.ttl;
236 let dirname = self.p.display().to_string();
237 for entry in fs::read_dir(&self.p).with_context(|| dirname.clone())? {
238 let entry = match entry {
239 Ok(p) => p,
240 Err(err) => {
241 tracing::warn!(dir = dirname, error = %err, "File store could read directory contents");
242 continue;
243 }
244 };
245 if !entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
246 tracing::warn!(dir = dirname, entry = %entry.path().display(), "File store directory should only contain files");
247 continue;
248 }
249 let ctx = entry.path().display().to_string();
250 let metadata = match entry.metadata() {
251 Ok(m) => m,
252 Err(err) => {
253 tracing::warn!(path = %ctx, error = %err, "Failed fetching metadata");
254 continue;
255 }
256 };
257 let last_modified = match metadata.modified() {
258 Ok(lm) => lm,
259 Err(err) => {
260 tracing::warn!(path = %ctx, error = %err, "Failed reading mtime");
263 continue;
264 }
265 };
266 if last_modified < deadline {
267 tracing::info!(path = ctx, ?last_modified, "Expired");
268 if let Err(err) = fs::remove_file(entry.path()) {
269 tracing::warn!(path = %ctx, error = %err, "Failed removing");
270 }
271 }
272 }
273 Ok(())
274 }
275
276 fn delete_owned_files(&mut self) -> anyhow::Result<()> {
277 let mut errs = Vec::new();
278 for p in self.owned_files.lock().drain() {
279 if let Err(err) = fs::remove_file(&p) {
280 errs.push(format!("{}: {err}", p.display()));
281 }
282 }
283 if !errs.is_empty() {
284 anyhow::bail!(errs.join(", "));
285 }
286 Ok(())
287 }
288}
289
290impl fmt::Display for Directory {
291 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
292 write!(f, "{}", self.p.display())
293 }
294}
295
296#[async_trait]
297impl Bucket for Directory {
298 async fn insert(
301 &self,
302 key: &Key,
303 value: bytes::Bytes,
304 _revision: u64, ) -> Result<StoreOutcome, StoreError> {
306 let safe_key = key.url_safe();
307 let full_path = self.p.join(safe_key.as_ref());
308 let str_path = full_path.display().to_string();
309
310 let temp_name = format!("{TEMP_FILE_PREFIX}{:016x}", rand::random::<u64>());
313 let temp_path = self.p.join(&temp_name);
314
315 fs::write(&temp_path, &value)
317 .with_context(|| format!("writing temp file {}", temp_path.display()))
318 .map_err(a_to_fs_err)?;
319
320 fs::rename(&temp_path, &full_path)
322 .with_context(|| format!("renaming {} to {}", temp_path.display(), str_path))
323 .map_err(a_to_fs_err)?;
324
325 self.owned_files.lock().insert(full_path.clone());
326 Ok(StoreOutcome::Created(0))
327 }
328
329 async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
331 let safe_key = key.url_safe();
332 let full_path = self.p.join(safe_key.as_ref());
333 if !full_path.exists() {
334 return Ok(None);
335 }
336 let str_path = full_path.display().to_string();
337 let data: bytes::Bytes = fs::read(&full_path)
338 .context(str_path)
339 .map_err(a_to_fs_err)?
340 .into();
341 Ok(Some(data))
342 }
343
344 async fn delete(&self, key: &Key) -> Result<(), StoreError> {
346 let safe_key = key.url_safe();
347 let full_path = self.p.join(safe_key.as_ref());
348 let str_path = full_path.display().to_string();
349 if !full_path.exists() {
350 return Err(StoreError::MissingKey(str_path));
351 }
352
353 self.owned_files.lock().remove(&full_path);
354
355 fs::remove_file(&full_path)
356 .context(str_path)
357 .map_err(a_to_fs_err)
358 }
359
360 async fn watch(
361 &self,
362 ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
363 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
364
365 let mut watcher = RecommendedWatcher::new(
366 move |res: Result<Event, notify::Error>| {
367 if let Err(err) = tx.blocking_send(res) {
368 tracing::error!(error = %err, "Failed to send file watch event");
369 }
370 },
371 Config::default(),
372 )
373 .map_err(to_fs_err)?;
374
375 watcher
376 .watch(&self.p, RecursiveMode::NonRecursive)
377 .map_err(to_fs_err)?;
378
379 let dir = self.p.clone();
380 let root = self.root.clone();
381
382 Ok(Box::pin(async_stream::stream! {
383 let _watcher = watcher;
385
386 while let Some(event_result) = rx.recv().await {
387 let event = match event_result {
388 Ok(event) => event,
389 Err(err) => {
390 tracing::error!(error = %err, "Failed receiving file watch event");
391 continue;
392 }
393 };
394
395 for item_path in event.paths {
396 if item_path == dir {
398 tracing::warn!("Unexpected event on the directory itself");
399 continue;
400 }
401
402 let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
405
406 let key = match canonical_item_path.strip_prefix(&root) {
407 Ok(stripped) => Key::from_url_safe(&stripped.display().to_string()),
408 Err(err) => {
409 tracing::error!(
412 error = %err,
413 item_path = %canonical_item_path.display(),
414 root = %root.display(),
415 "Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
416 continue;
417 }
418 };
419
420 if item_path.file_name()
422 .map(|n| n.to_string_lossy().starts_with(TEMP_FILE_PREFIX))
423 .unwrap_or(false)
424 {
425 continue;
426 }
427
428 match event.kind {
429 EventKind::Create(event::CreateKind::File)
431 | EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content))
432 | EventKind::Modify(event::ModifyKind::Name(event::RenameMode::To)) => {
433 let data: bytes::Bytes = match fs::read(&item_path) {
434 Ok(data) => data.into(),
435 Err(err) => {
436 tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
437 continue;
438 }
439 };
440 let item = KeyValue::new(key, data);
441 yield WatchEvent::Put(item);
442 }
443 EventKind::Remove(event::RemoveKind::File) => {
444 yield WatchEvent::Delete(key);
445 }
446 _ => {
447 continue;
449 }
450 }
451 }
452 }
453 }))
454 }
455
456 async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
457 let contents = fs::read_dir(&self.p)
458 .with_context(|| self.p.display().to_string())
459 .map_err(a_to_fs_err)?;
460 let mut out = HashMap::new();
461 for entry in contents {
462 let entry = entry.map_err(to_fs_err)?;
463 if !entry.path().is_file() {
464 tracing::warn!(
465 path = %entry.path().display(),
466 "Unexpected entry, directory should only contain files."
467 );
468 continue;
469 }
470
471 if entry
473 .file_name()
474 .to_string_lossy()
475 .starts_with(TEMP_FILE_PREFIX)
476 {
477 continue;
478 }
479
480 let canonical_entry_path = match entry.path().canonicalize() {
482 Ok(p) => p,
483 Err(err) => {
484 tracing::warn!(error = %err, path = %entry.path().display(), "Failed to canonicalize path. Using original path.");
485 entry.path()
486 }
487 };
488
489 let key = match canonical_entry_path.strip_prefix(&self.root) {
490 Ok(p) => Key::from_url_safe(&p.to_string_lossy()),
491 Err(err) => {
492 tracing::error!(
493 error = %err,
494 path = %canonical_entry_path.display(),
495 root = %self.root.display(),
496 "FileStore path not in root. Should be impossible. Skipping entry."
497 );
498 continue;
499 }
500 };
501 let data: bytes::Bytes = fs::read(entry.path())
502 .with_context(|| self.p.display().to_string())
503 .map_err(a_to_fs_err)?
504 .into();
505 out.insert(key, data);
506 }
507 Ok(out)
508 }
509}
510
511fn a_to_fs_err(err: anyhow::Error) -> StoreError {
513 StoreError::FilesystemError(format!("{err:#}"))
514}
515
516fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
517 StoreError::FilesystemError(err.to_string())
518}
519
520#[cfg(test)]
521mod tests {
522 use std::collections::HashSet;
523
524 use tokio_util::sync::CancellationToken;
525
526 use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
527
528 #[tokio::test]
529 async fn test_entries_full_path() {
530 let t = tempfile::tempdir().unwrap();
531
532 let cancel_token = CancellationToken::new();
533 let m = FileStore::new(cancel_token.clone(), t.path());
534 let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
535 let _ = bucket
536 .insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
537 .await
538 .unwrap();
539 let _ = bucket
540 .insert(&Key::new("key2".to_string()), "value2".into(), 0)
541 .await
542 .unwrap();
543 let entries = bucket.entries().await.unwrap();
544 let keys: HashSet<Key> = entries.into_keys().collect();
545 cancel_token.cancel(); assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
548 assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));
549 }
550}