Skip to main content

dynamo_runtime/storage/kv/
file.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp;
5use std::collections::HashSet;
6use std::ffi::OsString;
7use std::fmt;
8use std::fs;
9use std::fs::OpenOptions;
10use std::os::unix::ffi::OsStrExt;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15use std::time::SystemTime;
16use std::{collections::HashMap, pin::Pin};
17
18use anyhow::Context as _;
19use async_trait::async_trait;
20use futures::StreamExt;
21use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
22use parking_lot::Mutex;
23use tokio_util::sync::CancellationToken;
24
25use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
26
27/// How long until a key expires. We keep the keys alive by touching the files.
28/// 10s is the same as our etcd lease expiry.
29const DEFAULT_TTL: Duration = Duration::from_secs(10);
30
31/// Don't do keep-alive any more often than this. Limits the disk write load.
32const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
33
34/// Treat as a singleton
35#[derive(Clone)]
36pub struct FileStore {
37    cancel_token: CancellationToken,
38    root: PathBuf,
39    connection_id: u64,
40    /// Directories we may have created files in, for shutdown cleanup and keep-alive.
41    /// Arc so that we only ever have one map here after clone.
42    active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
43}
44
45impl FileStore {
46    pub(super) fn new<P: Into<PathBuf>>(cancel_token: CancellationToken, root_dir: P) -> Self {
47        let fs = FileStore {
48            cancel_token,
49            root: root_dir.into(),
50            connection_id: rand::random::<u64>(),
51            active_dirs: Arc::new(Mutex::new(HashMap::new())),
52        };
53        let c = fs.clone();
54        thread::spawn(move || c.expiry_thread());
55        fs
56    }
57
58    /// Keep our files alive and delete expired keys.
59    ///
60    /// Does not return until cancellation token cancelled. On shutdown the process will
61    /// often exit before we detect cancellation. That's fine.
62    /// We run this in a real thread so it doesn't get delayed by tokio runtime under heavy load.
63    fn expiry_thread(&self) {
64        loop {
65            let ttl = self.shortest_ttl();
66            let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
67
68            // Check before and after the sleep
69            if self.cancel_token.is_cancelled() {
70                break;
71            }
72
73            thread::sleep(keep_alive_interval);
74
75            if self.cancel_token.is_cancelled() {
76                break;
77            }
78
79            self.keep_alive();
80            if let Err(err) = self.delete_expired_files() {
81                tracing::error!(error = %err, "FileStore delete_expired_files");
82            }
83        }
84    }
85
86    /// The shortest TTL of any directory we are using.
87    fn shortest_ttl(&self) -> Duration {
88        let mut ttl = DEFAULT_TTL;
89        let active_dirs = self.active_dirs.lock().clone();
90        for (_, dir) in active_dirs {
91            ttl = cmp::min(ttl, dir.ttl);
92        }
93        tracing::trace!("FileStore expiry shortest ttl {ttl:?}");
94        ttl
95    }
96
97    fn keep_alive(&self) {
98        let active_dirs = self.active_dirs.lock().clone();
99        for (_, dir) in active_dirs {
100            dir.keep_alive();
101        }
102    }
103
104    fn delete_expired_files(&self) -> anyhow::Result<()> {
105        let active_dirs = self.active_dirs.lock().clone();
106        for (path, dir) in active_dirs {
107            dir.delete_expired_files()
108                .with_context(|| path.display().to_string())?;
109        }
110        Ok(())
111    }
112}
113
114#[async_trait]
115impl Store for FileStore {
116    type Bucket = Directory;
117
118    /// A "bucket" is a directory
119    async fn get_or_create_bucket(
120        &self,
121        bucket_name: &str,
122        ttl: Option<Duration>,
123    ) -> Result<Self::Bucket, StoreError> {
124        let p = self.root.join(bucket_name);
125        if let Some(dir) = self.active_dirs.lock().get(&p) {
126            return Ok(dir.clone());
127        };
128
129        if p.exists() {
130            // Get
131            if !p.is_dir() {
132                return Err(StoreError::FilesystemError(
133                    "Bucket name is not a directory".to_string(),
134                ));
135            }
136        } else {
137            // Create
138            fs::create_dir_all(&p).map_err(to_fs_err)?;
139        }
140        let dir = Directory::new(self.root.clone(), p.clone(), ttl.unwrap_or(DEFAULT_TTL));
141        self.active_dirs.lock().insert(p, dir.clone());
142        Ok(dir)
143    }
144
145    /// A "bucket" is a directory
146    async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
147        let p = self.root.join(bucket_name);
148        if let Some(dir) = self.active_dirs.lock().get(&p) {
149            return Ok(Some(dir.clone()));
150        };
151
152        if !p.exists() {
153            return Ok(None);
154        }
155        if !p.is_dir() {
156            return Err(StoreError::FilesystemError(
157                "Bucket name is not a directory".to_string(),
158            ));
159        }
160        // The filesystem itself doesn't store the TTL so for now default it
161        let dir = Directory::new(self.root.clone(), p.clone(), DEFAULT_TTL);
162        self.active_dirs.lock().insert(p, dir.clone());
163        Ok(Some(dir))
164    }
165
166    fn connection_id(&self) -> u64 {
167        self.connection_id
168    }
169
170    // This cannot be a Drop imp because DistributedRuntime is cloned various places including
171    // Python. Drop doesn't get called.
172    fn shutdown(&self) {
173        for (_, mut dir) in self.active_dirs.lock().drain() {
174            if let Err(err) = dir.delete_owned_files() {
175                tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
176            }
177        }
178    }
179}
180
181#[derive(Clone)]
182pub struct Directory {
183    root: PathBuf,
184    p: PathBuf,
185    ttl: Duration,
186    /// These are the files we created and hence must delete on shutdown
187    owned_files: Arc<Mutex<HashSet<PathBuf>>>,
188}
189
190impl Directory {
191    fn new(root: PathBuf, p: PathBuf, ttl: Duration) -> Self {
192        // Canonicalize root to handle symlinks (e.g., /var -> /private/var on macOS)
193        let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone());
194        if ttl < MIN_KEEP_ALIVE {
195            let h_ttl = humantime::format_duration(ttl);
196            tracing::warn!(path = %p.display(), ttl = %h_ttl, "ttl is too short, increasing to {}", humantime::format_duration(MIN_KEEP_ALIVE));
197        }
198        let ttl = cmp::max(ttl, MIN_KEEP_ALIVE);
199        Directory {
200            root: canonical_root,
201            p,
202            ttl,
203            owned_files: Arc::new(Mutex::new(HashSet::new())),
204        }
205    }
206
207    /// touch the files we own so they don't get deleted by a different FileStore
208    fn keep_alive(&self) {
209        let owned_files = self.owned_files.lock().clone();
210        for path in owned_files {
211            let file = match OpenOptions::new().write(true).open(&path) {
212                Ok(f) => f,
213                Err(err) => {
214                    tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed opening owned file");
215                    continue;
216                }
217            };
218            if let Err(err) = file.set_modified(SystemTime::now()) {
219                tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed set_modified on owned file");
220                continue;
221            }
222            tracing::trace!("FileStore keep_alive set {}", path.display());
223        }
224    }
225
226    /// Remove any files not touched for longer than TTL.
227    /// This looks at all files in the directory to catch orphaned files from processes that didn't stop cleanly.
228    /// Returns an error if we cannot open the directory. Errors inside the directory are logged
229    /// but non-fatal.
230    fn delete_expired_files(&self) -> anyhow::Result<()> {
231        let deadline = SystemTime::now() - self.ttl;
232        let dirname = self.p.display().to_string();
233        for entry in fs::read_dir(&self.p).with_context(|| dirname.clone())? {
234            let entry = match entry {
235                Ok(p) => p,
236                Err(err) => {
237                    tracing::warn!(dir = dirname, error = %err, "File store could read directory contents");
238                    continue;
239                }
240            };
241            if !entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
242                tracing::warn!(dir = dirname, entry = %entry.path().display(), "File store directory should only contain files");
243                continue;
244            }
245            let ctx = entry.path().display().to_string();
246            let metadata = match entry.metadata() {
247                Ok(m) => m,
248                Err(err) => {
249                    tracing::warn!(path = %ctx, error = %err, "Failed fetching metadata");
250                    continue;
251                }
252            };
253            let last_modified = match metadata.modified() {
254                Ok(lm) => lm,
255                Err(err) => {
256                    // We should only get an error on platforms with no mtime, which we don't
257                    // support anyway.
258                    tracing::warn!(path = %ctx, error = %err, "Failed reading mtime");
259                    continue;
260                }
261            };
262            if last_modified < deadline {
263                tracing::info!(path = ctx, ?last_modified, "Expired");
264                if let Err(err) = fs::remove_file(entry.path()) {
265                    tracing::warn!(path = %ctx, error = %err, "Failed removing");
266                }
267            }
268        }
269        Ok(())
270    }
271
272    fn delete_owned_files(&mut self) -> anyhow::Result<()> {
273        let mut errs = Vec::new();
274        for p in self.owned_files.lock().drain() {
275            if let Err(err) = fs::remove_file(&p) {
276                errs.push(format!("{}: {err}", p.display()));
277            }
278        }
279        if !errs.is_empty() {
280            anyhow::bail!(errs.join(", "));
281        }
282        Ok(())
283    }
284}
285
286impl fmt::Display for Directory {
287    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
288        write!(f, "{}", self.p.display())
289    }
290}
291
292#[async_trait]
293impl Bucket for Directory {
294    /// Write a file to the directory
295    async fn insert(
296        &self,
297        key: &Key,
298        value: bytes::Bytes,
299        _revision: u64, // Not used. Maybe put in file name?
300    ) -> Result<StoreOutcome, StoreError> {
301        let safe_key = key.url_safe();
302        let full_path = self.p.join(safe_key.as_ref());
303        self.owned_files.lock().insert(full_path.clone());
304        let str_path = full_path.display().to_string();
305        fs::write(&full_path, &value)
306            .context(str_path)
307            .map_err(a_to_fs_err)?;
308        Ok(StoreOutcome::Created(0))
309    }
310
311    /// Read a file from the directory
312    async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
313        let safe_key = key.url_safe();
314        let full_path = self.p.join(safe_key.as_ref());
315        if !full_path.exists() {
316            return Ok(None);
317        }
318        let str_path = full_path.display().to_string();
319        let data: bytes::Bytes = fs::read(&full_path)
320            .context(str_path)
321            .map_err(a_to_fs_err)?
322            .into();
323        Ok(Some(data))
324    }
325
326    /// Delete a file from the directory
327    async fn delete(&self, key: &Key) -> Result<(), StoreError> {
328        let safe_key = key.url_safe();
329        let full_path = self.p.join(safe_key.as_ref());
330        let str_path = full_path.display().to_string();
331        if !full_path.exists() {
332            return Err(StoreError::MissingKey(str_path));
333        }
334
335        self.owned_files.lock().remove(&full_path);
336
337        fs::remove_file(&full_path)
338            .context(str_path)
339            .map_err(a_to_fs_err)
340    }
341
342    async fn watch(
343        &self,
344    ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
345        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
346
347        let mut watcher = RecommendedWatcher::new(
348            move |res: Result<Event, notify::Error>| {
349                if let Err(err) = tx.blocking_send(res) {
350                    tracing::error!(error = %err, "Failed to send file watch event");
351                }
352            },
353            Config::default(),
354        )
355        .map_err(to_fs_err)?;
356
357        watcher
358            .watch(&self.p, RecursiveMode::NonRecursive)
359            .map_err(to_fs_err)?;
360
361        let dir = self.p.clone();
362        let root = self.root.clone();
363
364        Ok(Box::pin(async_stream::stream! {
365            // Keep watcher alive for the duration of the stream
366            let _watcher = watcher;
367
368            while let Some(event_result) = rx.recv().await {
369                let event = match event_result {
370                    Ok(event) => event,
371                    Err(err) => {
372                        tracing::error!(error = %err, "Failed receiving file watch event");
373                        continue;
374                    }
375                };
376
377                for item_path in event.paths {
378                    // Skip if the event is for the directory itself
379                    if item_path == dir {
380                        tracing::warn!("Unexpected event on the directory itself");
381                        continue;
382                    }
383
384                    // Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
385                    // The unwrap_or_else path is for Remove case.
386                    let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
387
388                    let key = match canonical_item_path.strip_prefix(&root) {
389                        Ok(stripped) => Key::from_url_safe(&stripped.display().to_string()),
390                        Err(err) => {
391                            // Possibly this should be a panic.
392                            // A key cannot be outside the file store root.
393                            tracing::error!(
394                                error = %err,
395                                item_path = %canonical_item_path.display(),
396                                root = %root.display(),
397                                "Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
398                            continue;
399                        }
400                    };
401
402                    match event.kind {
403                        EventKind::Create(event::CreateKind::File) | EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content)) => {
404                            let data: bytes::Bytes = match fs::read(&item_path) {
405                                Ok(data) => data.into(),
406                                Err(err) => {
407                                    tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
408                                    continue;
409                                }
410                            };
411                            let item = KeyValue::new(key, data);
412                            yield WatchEvent::Put(item);
413                        }
414                        EventKind::Remove(event::RemoveKind::File) => {
415                            yield WatchEvent::Delete(key);
416                        }
417                        _ => {
418                            // These happen every time the keep-alive updates last modified time
419                            continue;
420                        }
421                    }
422                }
423            }
424        }))
425    }
426
427    async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
428        let contents = fs::read_dir(&self.p)
429            .with_context(|| self.p.display().to_string())
430            .map_err(a_to_fs_err)?;
431        let mut out = HashMap::new();
432        for entry in contents {
433            let entry = entry.map_err(to_fs_err)?;
434            if !entry.path().is_file() {
435                tracing::warn!(
436                    path = %entry.path().display(),
437                    "Unexpected entry, directory should only contain files."
438                );
439                continue;
440            }
441
442            // Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
443            let canonical_entry_path = match entry.path().canonicalize() {
444                Ok(p) => p,
445                Err(err) => {
446                    tracing::warn!(error = %err, path = %entry.path().display(), "Failed to canonicalize path. Using original path.");
447                    entry.path()
448                }
449            };
450
451            let key = match canonical_entry_path.strip_prefix(&self.root) {
452                Ok(p) => Key::from_url_safe(&p.to_string_lossy()),
453                Err(err) => {
454                    tracing::error!(
455                        error = %err,
456                        path = %canonical_entry_path.display(),
457                        root = %self.root.display(),
458                        "FileStore path not in root. Should be impossible. Skipping entry."
459                    );
460                    continue;
461                }
462            };
463            let data: bytes::Bytes = fs::read(entry.path())
464                .with_context(|| self.p.display().to_string())
465                .map_err(a_to_fs_err)?
466                .into();
467            out.insert(key, data);
468        }
469        Ok(out)
470    }
471}
472
473// For anyhow preserve the context
474fn a_to_fs_err(err: anyhow::Error) -> StoreError {
475    StoreError::FilesystemError(format!("{err:#}"))
476}
477
478fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
479    StoreError::FilesystemError(err.to_string())
480}
481
482#[cfg(test)]
483mod tests {
484    use std::collections::HashSet;
485
486    use tokio_util::sync::CancellationToken;
487
488    use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
489
490    #[tokio::test]
491    async fn test_entries_full_path() {
492        let t = tempfile::tempdir().unwrap();
493
494        let cancel_token = CancellationToken::new();
495        let m = FileStore::new(cancel_token.clone(), t.path());
496        let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
497        let _ = bucket
498            .insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
499            .await
500            .unwrap();
501        let _ = bucket
502            .insert(&Key::new("key2".to_string()), "value2".into(), 0)
503            .await
504            .unwrap();
505        let entries = bucket.entries().await.unwrap();
506        let keys: HashSet<Key> = entries.into_keys().collect();
507        cancel_token.cancel(); // stop the background thread
508
509        assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
510        assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));
511    }
512}