Skip to main content

dynamo_runtime/storage/kv/
file.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp;
5use std::collections::HashSet;
6use std::ffi::OsString;
7use std::fmt;
8use std::fs;
9use std::fs::OpenOptions;
10use std::os::unix::ffi::OsStrExt;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15use std::time::SystemTime;
16use std::{collections::HashMap, pin::Pin};
17
18use anyhow::Context as _;
19use async_trait::async_trait;
20use futures::StreamExt;
21use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
22use parking_lot::Mutex;
23use tokio_util::sync::CancellationToken;
24
25use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
26
27/// How long until a key expires. We keep the keys alive by touching the files.
28/// 10s is the same as our etcd lease expiry.
29const DEFAULT_TTL: Duration = Duration::from_secs(10);
30
31/// Don't do keep-alive any more often than this. Limits the disk write load.
32const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
33
34/// Prefix for temporary files used in atomic writes.
35/// Files with this prefix are ignored by the watcher.
36const TEMP_FILE_PREFIX: &str = ".tmp_";
37
38/// Treat as a singleton
39#[derive(Clone)]
40pub struct FileStore {
41    cancel_token: CancellationToken,
42    root: PathBuf,
43    connection_id: u64,
44    /// Directories we may have created files in, for shutdown cleanup and keep-alive.
45    /// Arc so that we only ever have one map here after clone.
46    active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
47}
48
49impl FileStore {
50    pub(super) fn new<P: Into<PathBuf>>(cancel_token: CancellationToken, root_dir: P) -> Self {
51        let fs = FileStore {
52            cancel_token,
53            root: root_dir.into(),
54            connection_id: rand::random::<u64>(),
55            active_dirs: Arc::new(Mutex::new(HashMap::new())),
56        };
57        let c = fs.clone();
58        thread::spawn(move || c.expiry_thread());
59        fs
60    }
61
62    /// Keep our files alive and delete expired keys.
63    ///
64    /// Does not return until cancellation token cancelled. On shutdown the process will
65    /// often exit before we detect cancellation. That's fine.
66    /// We run this in a real thread so it doesn't get delayed by tokio runtime under heavy load.
67    fn expiry_thread(&self) {
68        loop {
69            let ttl = self.shortest_ttl();
70            let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
71
72            // Check before and after the sleep
73            if self.cancel_token.is_cancelled() {
74                break;
75            }
76
77            thread::sleep(keep_alive_interval);
78
79            if self.cancel_token.is_cancelled() {
80                break;
81            }
82
83            self.keep_alive();
84            if let Err(err) = self.delete_expired_files() {
85                tracing::error!(error = %err, "FileStore delete_expired_files");
86            }
87        }
88    }
89
90    /// The shortest TTL of any directory we are using.
91    fn shortest_ttl(&self) -> Duration {
92        let mut ttl = DEFAULT_TTL;
93        let active_dirs = self.active_dirs.lock().clone();
94        for (_, dir) in active_dirs {
95            ttl = cmp::min(ttl, dir.ttl);
96        }
97        tracing::trace!("FileStore expiry shortest ttl {ttl:?}");
98        ttl
99    }
100
101    fn keep_alive(&self) {
102        let active_dirs = self.active_dirs.lock().clone();
103        for (_, dir) in active_dirs {
104            dir.keep_alive();
105        }
106    }
107
108    fn delete_expired_files(&self) -> anyhow::Result<()> {
109        let active_dirs = self.active_dirs.lock().clone();
110        for (path, dir) in active_dirs {
111            dir.delete_expired_files()
112                .with_context(|| path.display().to_string())?;
113        }
114        Ok(())
115    }
116}
117
118#[async_trait]
119impl Store for FileStore {
120    type Bucket = Directory;
121
122    /// A "bucket" is a directory
123    async fn get_or_create_bucket(
124        &self,
125        bucket_name: &str,
126        ttl: Option<Duration>,
127    ) -> Result<Self::Bucket, StoreError> {
128        let p = self.root.join(bucket_name);
129        if let Some(dir) = self.active_dirs.lock().get(&p) {
130            return Ok(dir.clone());
131        };
132
133        if p.exists() {
134            // Get
135            if !p.is_dir() {
136                return Err(StoreError::FilesystemError(
137                    "Bucket name is not a directory".to_string(),
138                ));
139            }
140        } else {
141            // Create
142            fs::create_dir_all(&p).map_err(to_fs_err)?;
143        }
144        let dir = Directory::new(self.root.clone(), p.clone(), ttl.unwrap_or(DEFAULT_TTL));
145        self.active_dirs.lock().insert(p, dir.clone());
146        Ok(dir)
147    }
148
149    /// A "bucket" is a directory
150    async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
151        let p = self.root.join(bucket_name);
152        if let Some(dir) = self.active_dirs.lock().get(&p) {
153            return Ok(Some(dir.clone()));
154        };
155
156        if !p.exists() {
157            return Ok(None);
158        }
159        if !p.is_dir() {
160            return Err(StoreError::FilesystemError(
161                "Bucket name is not a directory".to_string(),
162            ));
163        }
164        // The filesystem itself doesn't store the TTL so for now default it
165        let dir = Directory::new(self.root.clone(), p.clone(), DEFAULT_TTL);
166        self.active_dirs.lock().insert(p, dir.clone());
167        Ok(Some(dir))
168    }
169
170    fn connection_id(&self) -> u64 {
171        self.connection_id
172    }
173
174    // This cannot be a Drop imp because DistributedRuntime is cloned various places including
175    // Python. Drop doesn't get called.
176    fn shutdown(&self) {
177        for (_, mut dir) in self.active_dirs.lock().drain() {
178            if let Err(err) = dir.delete_owned_files() {
179                tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
180            }
181        }
182    }
183}
184
185#[derive(Clone)]
186pub struct Directory {
187    root: PathBuf,
188    p: PathBuf,
189    ttl: Duration,
190    /// These are the files we created and hence must delete on shutdown
191    owned_files: Arc<Mutex<HashSet<PathBuf>>>,
192}
193
194impl Directory {
195    fn new(root: PathBuf, p: PathBuf, ttl: Duration) -> Self {
196        // Canonicalize root to handle symlinks (e.g., /var -> /private/var on macOS)
197        let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone());
198        if ttl < MIN_KEEP_ALIVE {
199            let h_ttl = humantime::format_duration(ttl);
200            tracing::warn!(path = %p.display(), ttl = %h_ttl, "ttl is too short, increasing to {}", humantime::format_duration(MIN_KEEP_ALIVE));
201        }
202        let ttl = cmp::max(ttl, MIN_KEEP_ALIVE);
203        Directory {
204            root: canonical_root,
205            p,
206            ttl,
207            owned_files: Arc::new(Mutex::new(HashSet::new())),
208        }
209    }
210
211    /// touch the files we own so they don't get deleted by a different FileStore
212    fn keep_alive(&self) {
213        let owned_files = self.owned_files.lock().clone();
214        for path in owned_files {
215            let file = match OpenOptions::new().write(true).open(&path) {
216                Ok(f) => f,
217                Err(err) => {
218                    tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed opening owned file");
219                    continue;
220                }
221            };
222            if let Err(err) = file.set_modified(SystemTime::now()) {
223                tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed set_modified on owned file");
224                continue;
225            }
226            tracing::trace!("FileStore keep_alive set {}", path.display());
227        }
228    }
229
230    /// Remove any files not touched for longer than TTL.
231    /// This looks at all files in the directory to catch orphaned files from processes that didn't stop cleanly.
232    /// Returns an error if we cannot open the directory. Errors inside the directory are logged
233    /// but non-fatal.
234    fn delete_expired_files(&self) -> anyhow::Result<()> {
235        let deadline = SystemTime::now() - self.ttl;
236        let dirname = self.p.display().to_string();
237        for entry in fs::read_dir(&self.p).with_context(|| dirname.clone())? {
238            let entry = match entry {
239                Ok(p) => p,
240                Err(err) => {
241                    tracing::warn!(dir = dirname, error = %err, "File store could read directory contents");
242                    continue;
243                }
244            };
245            if !entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
246                tracing::warn!(dir = dirname, entry = %entry.path().display(), "File store directory should only contain files");
247                continue;
248            }
249            let ctx = entry.path().display().to_string();
250            let metadata = match entry.metadata() {
251                Ok(m) => m,
252                Err(err) => {
253                    tracing::warn!(path = %ctx, error = %err, "Failed fetching metadata");
254                    continue;
255                }
256            };
257            let last_modified = match metadata.modified() {
258                Ok(lm) => lm,
259                Err(err) => {
260                    // We should only get an error on platforms with no mtime, which we don't
261                    // support anyway.
262                    tracing::warn!(path = %ctx, error = %err, "Failed reading mtime");
263                    continue;
264                }
265            };
266            if last_modified < deadline {
267                tracing::info!(path = ctx, ?last_modified, "Expired");
268                if let Err(err) = fs::remove_file(entry.path()) {
269                    tracing::warn!(path = %ctx, error = %err, "Failed removing");
270                }
271            }
272        }
273        Ok(())
274    }
275
276    fn delete_owned_files(&mut self) -> anyhow::Result<()> {
277        let mut errs = Vec::new();
278        for p in self.owned_files.lock().drain() {
279            if let Err(err) = fs::remove_file(&p) {
280                errs.push(format!("{}: {err}", p.display()));
281            }
282        }
283        if !errs.is_empty() {
284            anyhow::bail!(errs.join(", "));
285        }
286        Ok(())
287    }
288}
289
290impl fmt::Display for Directory {
291    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
292        write!(f, "{}", self.p.display())
293    }
294}
295
296#[async_trait]
297impl Bucket for Directory {
298    /// Write a file to the directory using atomic write (temp file + rename).
299    /// This ensures watchers never see a partially written file.
300    async fn insert(
301        &self,
302        key: &Key,
303        value: bytes::Bytes,
304        _revision: u64, // Not used. Maybe put in file name?
305    ) -> Result<StoreOutcome, StoreError> {
306        let safe_key = key.url_safe();
307        let full_path = self.p.join(safe_key.as_ref());
308        let str_path = full_path.display().to_string();
309
310        // Use atomic write: write to temp file, then rename.
311        // This prevents watchers from seeing partially written files.
312        let temp_name = format!("{TEMP_FILE_PREFIX}{:016x}", rand::random::<u64>());
313        let temp_path = self.p.join(&temp_name);
314
315        // Write to temp file first
316        fs::write(&temp_path, &value)
317            .with_context(|| format!("writing temp file {}", temp_path.display()))
318            .map_err(a_to_fs_err)?;
319
320        // Atomic rename to target path
321        fs::rename(&temp_path, &full_path)
322            .with_context(|| format!("renaming {} to {}", temp_path.display(), str_path))
323            .map_err(a_to_fs_err)?;
324
325        self.owned_files.lock().insert(full_path.clone());
326        Ok(StoreOutcome::Created(0))
327    }
328
329    /// Read a file from the directory
330    async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
331        let safe_key = key.url_safe();
332        let full_path = self.p.join(safe_key.as_ref());
333        if !full_path.exists() {
334            return Ok(None);
335        }
336        let str_path = full_path.display().to_string();
337        let data: bytes::Bytes = fs::read(&full_path)
338            .context(str_path)
339            .map_err(a_to_fs_err)?
340            .into();
341        Ok(Some(data))
342    }
343
344    /// Delete a file from the directory
345    async fn delete(&self, key: &Key) -> Result<(), StoreError> {
346        let safe_key = key.url_safe();
347        let full_path = self.p.join(safe_key.as_ref());
348        let str_path = full_path.display().to_string();
349        if !full_path.exists() {
350            return Err(StoreError::MissingKey(str_path));
351        }
352
353        self.owned_files.lock().remove(&full_path);
354
355        fs::remove_file(&full_path)
356            .context(str_path)
357            .map_err(a_to_fs_err)
358    }
359
360    async fn watch(
361        &self,
362    ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
363        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
364
365        let mut watcher = RecommendedWatcher::new(
366            move |res: Result<Event, notify::Error>| {
367                if let Err(err) = tx.blocking_send(res) {
368                    tracing::error!(error = %err, "Failed to send file watch event");
369                }
370            },
371            Config::default(),
372        )
373        .map_err(to_fs_err)?;
374
375        watcher
376            .watch(&self.p, RecursiveMode::NonRecursive)
377            .map_err(to_fs_err)?;
378
379        let dir = self.p.clone();
380        let root = self.root.clone();
381
382        Ok(Box::pin(async_stream::stream! {
383            // Keep watcher alive for the duration of the stream
384            let _watcher = watcher;
385
386            while let Some(event_result) = rx.recv().await {
387                let event = match event_result {
388                    Ok(event) => event,
389                    Err(err) => {
390                        tracing::error!(error = %err, "Failed receiving file watch event");
391                        continue;
392                    }
393                };
394
395                for item_path in event.paths {
396                    // Skip if the event is for the directory itself
397                    if item_path == dir {
398                        tracing::warn!("Unexpected event on the directory itself");
399                        continue;
400                    }
401
402                    // Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
403                    // The unwrap_or_else path is for Remove case.
404                    let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
405
406                    let key = match canonical_item_path.strip_prefix(&root) {
407                        Ok(stripped) => Key::from_url_safe(&stripped.display().to_string()),
408                        Err(err) => {
409                            // Possibly this should be a panic.
410                            // A key cannot be outside the file store root.
411                            tracing::error!(
412                                error = %err,
413                                item_path = %canonical_item_path.display(),
414                                root = %root.display(),
415                                "Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
416                            continue;
417                        }
418                    };
419
420                    // Skip temp files used for atomic writes
421                    if item_path.file_name()
422                        .map(|n| n.to_string_lossy().starts_with(TEMP_FILE_PREFIX))
423                        .unwrap_or(false)
424                    {
425                        continue;
426                    }
427
428                    match event.kind {
429                        // Handle file creation, modification, and rename-to (from atomic writes)
430                        EventKind::Create(event::CreateKind::File)
431                        | EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content))
432                        | EventKind::Modify(event::ModifyKind::Name(event::RenameMode::To)) => {
433                            let data: bytes::Bytes = match fs::read(&item_path) {
434                                Ok(data) => data.into(),
435                                Err(err) => {
436                                    tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
437                                    continue;
438                                }
439                            };
440                            let item = KeyValue::new(key, data);
441                            yield WatchEvent::Put(item);
442                        }
443                        EventKind::Remove(event::RemoveKind::File) => {
444                            yield WatchEvent::Delete(key);
445                        }
446                        _ => {
447                            // These happen every time the keep-alive updates last modified time
448                            continue;
449                        }
450                    }
451                }
452            }
453        }))
454    }
455
456    async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
457        let contents = fs::read_dir(&self.p)
458            .with_context(|| self.p.display().to_string())
459            .map_err(a_to_fs_err)?;
460        let mut out = HashMap::new();
461        for entry in contents {
462            let entry = entry.map_err(to_fs_err)?;
463            if !entry.path().is_file() {
464                tracing::warn!(
465                    path = %entry.path().display(),
466                    "Unexpected entry, directory should only contain files."
467                );
468                continue;
469            }
470
471            // Skip temp files used for atomic writes
472            if entry
473                .file_name()
474                .to_string_lossy()
475                .starts_with(TEMP_FILE_PREFIX)
476            {
477                continue;
478            }
479
480            // Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
481            let canonical_entry_path = match entry.path().canonicalize() {
482                Ok(p) => p,
483                Err(err) => {
484                    tracing::warn!(error = %err, path = %entry.path().display(), "Failed to canonicalize path. Using original path.");
485                    entry.path()
486                }
487            };
488
489            let key = match canonical_entry_path.strip_prefix(&self.root) {
490                Ok(p) => Key::from_url_safe(&p.to_string_lossy()),
491                Err(err) => {
492                    tracing::error!(
493                        error = %err,
494                        path = %canonical_entry_path.display(),
495                        root = %self.root.display(),
496                        "FileStore path not in root. Should be impossible. Skipping entry."
497                    );
498                    continue;
499                }
500            };
501            let data: bytes::Bytes = fs::read(entry.path())
502                .with_context(|| self.p.display().to_string())
503                .map_err(a_to_fs_err)?
504                .into();
505            out.insert(key, data);
506        }
507        Ok(out)
508    }
509}
510
511// For anyhow preserve the context
512fn a_to_fs_err(err: anyhow::Error) -> StoreError {
513    StoreError::FilesystemError(format!("{err:#}"))
514}
515
516fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
517    StoreError::FilesystemError(err.to_string())
518}
519
520#[cfg(test)]
521mod tests {
522    use std::collections::HashSet;
523
524    use tokio_util::sync::CancellationToken;
525
526    use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
527
528    #[tokio::test]
529    async fn test_entries_full_path() {
530        let t = tempfile::tempdir().unwrap();
531
532        let cancel_token = CancellationToken::new();
533        let m = FileStore::new(cancel_token.clone(), t.path());
534        let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
535        let _ = bucket
536            .insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
537            .await
538            .unwrap();
539        let _ = bucket
540            .insert(&Key::new("key2".to_string()), "value2".into(), 0)
541            .await
542            .unwrap();
543        let entries = bucket.entries().await.unwrap();
544        let keys: HashSet<Key> = entries.into_keys().collect();
545        cancel_token.cancel(); // stop the background thread
546
547        assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
548        assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));
549    }
550}