dynamo_runtime/storage/key_value_store/
file.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashSet;
5use std::ffi::OsString;
6use std::fmt;
7use std::fs;
8use std::os::unix::ffi::OsStrExt;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Duration;
12use std::{collections::HashMap, pin::Pin};
13
14use anyhow::Context as _;
15use async_trait::async_trait;
16use futures::StreamExt;
17use inotify::{Event, EventMask, EventStream, Inotify, WatchMask};
18use parking_lot::Mutex;
19
20use crate::storage::key_value_store::KeyValue;
21
22use super::{Key, KeyValueBucket, KeyValueStore, StoreError, StoreOutcome, WatchEvent};
23
24/// Treat as a singleton
25#[derive(Clone)]
26pub struct FileStore {
27    root: PathBuf,
28    connection_id: u64,
29    /// Directories we may have created files in, for shutdown cleanup
30    /// Arc so that we only ever have one map here after clone
31    active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
32}
33
34impl FileStore {
35    pub(super) fn new<P: Into<PathBuf>>(root_dir: P) -> Self {
36        FileStore {
37            root: root_dir.into(),
38            connection_id: rand::random::<u64>(),
39            active_dirs: Arc::new(Mutex::new(HashMap::new())),
40        }
41    }
42}
43
44#[async_trait]
45impl KeyValueStore for FileStore {
46    type Bucket = Directory;
47
48    /// A "bucket" is a directory
49    async fn get_or_create_bucket(
50        &self,
51        bucket_name: &str,
52        _ttl: Option<Duration>, // TODO ttl not used yet
53    ) -> Result<Self::Bucket, StoreError> {
54        let p = self.root.join(bucket_name);
55        if let Some(dir) = self.active_dirs.lock().get(&p) {
56            return Ok(dir.clone());
57        };
58
59        if p.exists() {
60            // Get
61            if !p.is_dir() {
62                return Err(StoreError::FilesystemError(
63                    "Bucket name is not a directory".to_string(),
64                ));
65            }
66        } else {
67            // Create
68            fs::create_dir_all(&p).map_err(to_fs_err)?;
69        }
70        let dir = Directory::new(self.root.clone(), p.clone());
71        self.active_dirs.lock().insert(p, dir.clone());
72        Ok(dir)
73    }
74
75    /// A "bucket" is a directory
76    async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
77        let p = self.root.join(bucket_name);
78        if let Some(dir) = self.active_dirs.lock().get(&p) {
79            return Ok(Some(dir.clone()));
80        };
81
82        if !p.exists() {
83            return Ok(None);
84        }
85        if !p.is_dir() {
86            return Err(StoreError::FilesystemError(
87                "Bucket name is not a directory".to_string(),
88            ));
89        }
90        let dir = Directory::new(self.root.clone(), p.clone());
91        self.active_dirs.lock().insert(p, dir.clone());
92        Ok(Some(dir))
93    }
94
95    fn connection_id(&self) -> u64 {
96        self.connection_id
97    }
98
99    // This cannot be a Drop imp because DistributedRuntime is cloned various places including
100    // Python. Drop doesn't get called.
101    fn shutdown(&self) {
102        for (_, mut dir) in self.active_dirs.lock().drain() {
103            if let Err(err) = dir.delete_owned_files() {
104                tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
105            }
106        }
107    }
108}
109
110#[derive(Clone)]
111pub struct Directory {
112    root: PathBuf,
113    p: PathBuf,
114    /// These are the files we created and hence must delete on shutdown
115    owned_files: Arc<Mutex<HashSet<PathBuf>>>,
116}
117
118impl Directory {
119    fn new(root: PathBuf, p: PathBuf) -> Self {
120        Directory {
121            root,
122            p,
123            owned_files: Arc::new(Mutex::new(HashSet::new())),
124        }
125    }
126
127    fn delete_owned_files(&mut self) -> anyhow::Result<()> {
128        let mut errs = Vec::new();
129        for p in self.owned_files.lock().drain() {
130            if let Err(err) = fs::remove_file(&p) {
131                errs.push(format!("{}: {err}", p.display()));
132            }
133        }
134        if !errs.is_empty() {
135            anyhow::bail!(errs.join(", "));
136        }
137        Ok(())
138    }
139}
140
141impl fmt::Display for Directory {
142    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143        write!(f, "{}", self.p.display())
144    }
145}
146
147#[async_trait]
148impl KeyValueBucket for Directory {
149    /// Write a file to the directory
150    async fn insert(
151        &self,
152        key: &Key,
153        value: bytes::Bytes,
154        _revision: u64, // Not used. Maybe put in file name?
155    ) -> Result<StoreOutcome, StoreError> {
156        let safe_key = Key::new(key.as_ref()); // because of from_raw
157        let full_path = self.p.join(safe_key.as_ref());
158        self.owned_files.lock().insert(full_path.clone());
159        let str_path = full_path.display().to_string();
160        fs::write(&full_path, &value)
161            .context(str_path)
162            .map_err(a_to_fs_err)?;
163        Ok(StoreOutcome::Created(0))
164    }
165
166    /// Read a file from the directory
167    async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
168        let safe_key = Key::new(key.as_ref()); // because of from_raw
169        let full_path = self.p.join(safe_key.as_ref());
170        if !full_path.exists() {
171            return Ok(None);
172        }
173        let str_path = full_path.display().to_string();
174        let data: bytes::Bytes = fs::read(&full_path)
175            .context(str_path)
176            .map_err(a_to_fs_err)?
177            .into();
178        Ok(Some(data))
179    }
180
181    /// Delete a file from the directory
182    async fn delete(&self, key: &Key) -> Result<(), StoreError> {
183        let safe_key = Key::new(key.as_ref()); // because of from_raw
184        let full_path = self.p.join(safe_key.as_ref());
185        let str_path = full_path.display().to_string();
186        if !full_path.exists() {
187            return Err(StoreError::MissingKey(str_path));
188        }
189
190        self.owned_files.lock().remove(&full_path);
191
192        fs::remove_file(&full_path)
193            .context(str_path)
194            .map_err(a_to_fs_err)
195    }
196
197    async fn watch(
198        &self,
199    ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
200        let inotify = Inotify::init().map_err(to_fs_err)?;
201        inotify
202            .watches()
203            .add(
204                &self.p,
205                WatchMask::MODIFY | WatchMask::CREATE | WatchMask::DELETE,
206            )
207            .map_err(to_fs_err)?;
208
209        let dir = self.p.clone();
210        Ok(Box::pin(async_stream::stream! {
211            let mut buffer = [0; 1024];
212            let mut events = match inotify.into_event_stream(&mut buffer) {
213                Ok(events) => events,
214                Err(err) => {
215                    tracing::error!(error = %err, "Failed getting event stream from inotify");
216                    return;
217                }
218            };
219            while let Some(Ok(event)) = events.next().await {
220                let Some(name) = event.name else {
221                    tracing::warn!("Unexpected event on the directory itself");
222                    continue;
223                };
224                let item_path = dir.join(name);
225                let key = match item_path.strip_prefix(&self.root) {
226                    Ok(stripped) => stripped.display().to_string().replace("_", "/"),
227                    Err(err) => {
228                        // Possibly this should be a panic.
229                        // A key cannot be outside the file store root.
230                        tracing::error!(
231                            error = %err,
232                            item_path = %item_path.display(),
233                            root = %self.root.display(),
234                            "Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
235                        continue;
236                    }
237                };
238
239                match event.mask {
240                    EventMask::MODIFY | EventMask::CREATE => {
241                        let data: bytes::Bytes = match fs::read(&item_path) {
242                            Ok(data) => data.into(),
243                            Err(err) => {
244                                tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
245                                continue;
246                            }
247                        };
248                        let item = KeyValue::new(key, data);
249                        yield WatchEvent::Put(item);
250                    }
251                    EventMask::DELETE => {
252                        yield WatchEvent::Delete(Key::from_raw(key));
253                    }
254                    event_type => {
255                        tracing::warn!(?event_type, dir = %dir.display(), "Unexpected event type");
256                        continue;
257                    }
258                }
259            }
260        }))
261    }
262
263    async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError> {
264        let contents = fs::read_dir(&self.p)
265            .with_context(|| self.p.display().to_string())
266            .map_err(a_to_fs_err)?;
267        let mut out = HashMap::new();
268        for entry in contents {
269            let entry = entry.map_err(to_fs_err)?;
270            if !entry.path().is_file() {
271                tracing::warn!(
272                    path = %entry.path().display(),
273                    "Unexpected entry, directory should only contain files."
274                );
275                continue;
276            }
277
278            let key = match entry.path().strip_prefix(&self.root) {
279                Ok(p) => p.to_string_lossy().to_string().replace("_", "/"),
280                Err(err) => {
281                    tracing::error!(
282                        error = %err,
283                        path = %entry.path().display(),
284                        root = %self.root.display(),
285                        "FileStore path not in root. Should be impossible. Skipping entry."
286                    );
287                    continue;
288                }
289            };
290            let data: bytes::Bytes = fs::read(entry.path())
291                .with_context(|| self.p.display().to_string())
292                .map_err(a_to_fs_err)?
293                .into();
294            out.insert(key, data);
295        }
296        Ok(out)
297    }
298}
299
300// For anyhow preserve the context
301fn a_to_fs_err(err: anyhow::Error) -> StoreError {
302    StoreError::FilesystemError(format!("{err:#}"))
303}
304
305fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
306    StoreError::FilesystemError(err.to_string())
307}