Skip to main content

distri_filesystem/
store.rs

1use crate::{DirectoryEntry, DirectoryListing, FileReadResult, FileSystemConfig, ReadParams};
2use anyhow::{anyhow, Context, Result};
3use bytes::Bytes;
4use futures::TryStreamExt;
5use object_store::path::Path;
6use object_store::ObjectStore;
7use std::sync::Arc;
8
9/// Object-store backed file storage implementation
10#[derive(Debug)]
11pub struct FileSystemStore {
12    store: Arc<dyn ObjectStore>,
13    root: Option<Path>,
14}
15
16impl FileSystemStore {
17    /// Create a new FileSystemStore using the provided configuration
18    pub async fn new(config: FileSystemConfig) -> Result<Self> {
19        let store = crate::object_store::build_object_store(&config.object_store)?;
20
21        let root = config.root_prefix.as_ref().and_then(|prefix| {
22            let trimmed = prefix.trim_matches('/');
23            if trimmed.is_empty() {
24                None
25            } else {
26                Some(Path::from(trimmed))
27            }
28        });
29
30        Ok(Self { store, root })
31    }
32
33    pub fn root_prefix(&self) -> Option<String> {
34        self.root.as_ref().map(|p| p.to_string())
35    }
36
37    pub fn scoped(&self, prefix: Option<&str>) -> Result<Self> {
38        let new_root = match prefix {
39            Some(extra) => Self::combine_prefixes(&self.root, extra)?,
40            None => self.root.clone(),
41        };
42        Ok(Self {
43            store: self.store.clone(),
44            root: new_root,
45        })
46    }
47
48    fn ensure_safe_path(path: &str) -> Result<&str> {
49        if path.split('/').any(|segment| segment == "..") {
50            return Err(anyhow!("path segments must not contain '..'"));
51        }
52        Ok(path)
53    }
54
55    fn combine_prefixes(base: &Option<Path>, extra: &str) -> Result<Option<Path>> {
56        let trimmed = extra.trim_matches('/');
57        if trimmed.is_empty() {
58            return Ok(base.clone());
59        }
60        Self::ensure_safe_path(trimmed)?;
61        let combined = match base {
62            Some(existing) => {
63                let mut prefix = existing.to_string();
64                if !prefix.ends_with('/') {
65                    prefix.push('/');
66                }
67                prefix.push_str(trimmed);
68                Path::from(prefix)
69            }
70            None => Path::from(trimmed),
71        };
72        Ok(Some(combined))
73    }
74
75    fn sanitize_object_path(&self, path: &str) -> Result<Path> {
76        let trimmed = path.trim_matches('/');
77        if trimmed.is_empty() {
78            return Err(anyhow!("path cannot be empty"));
79        }
80        Self::ensure_safe_path(trimmed)?;
81        let normalized = match &self.root {
82            Some(root) if !root.as_ref().is_empty() => {
83                // Join the root prefix with the relative path while preserving separators
84                Path::from(format!("{}/{}", root, trimmed))
85            }
86            _ => Path::from(trimmed),
87        };
88        Ok(normalized)
89    }
90
91    fn sanitize_prefix(&self, path: &str) -> Result<Option<Path>> {
92        let trimmed = path.trim_matches('/');
93        if trimmed.is_empty() {
94            return Ok(self.root.clone());
95        }
96        Self::ensure_safe_path(trimmed)?;
97        let prefix = match &self.root {
98            Some(root) if !root.as_ref().is_empty() => {
99                // Combine the root prefix with the requested path using path separators
100                Path::from(format!("{}/{}", root, trimmed))
101            }
102            _ => Path::from(trimmed),
103        };
104        Ok(Some(prefix))
105    }
106
107    fn prefix_depth(prefix: &Option<Path>) -> usize {
108        prefix.as_ref().map(|p| p.parts().count()).unwrap_or(0)
109    }
110
111    fn entry_name(prefix: &Option<Path>, entry: &Path) -> Option<String> {
112        let depth = Self::prefix_depth(prefix);
113        entry
114            .parts()
115            .nth(depth)
116            .map(|component| component.as_ref().to_string())
117    }
118
119    fn build_listing_entry(
120        prefix: &Option<Path>,
121        entry: &Path,
122        is_dir: bool,
123        size: Option<u64>,
124    ) -> Option<DirectoryEntry> {
125        Self::entry_name(prefix, entry).map(|name| DirectoryEntry {
126            name,
127            is_file: !is_dir,
128            is_dir,
129            size,
130        })
131    }
132
133    fn read_to_string(bytes: Bytes, path: &str) -> Result<String> {
134        String::from_utf8(bytes.to_vec())
135            .with_context(|| format!("failed to decode file {} as utf-8", path))
136    }
137
138    /// Read file content as raw string without line numbers
139    pub async fn read_raw(&self, path: &str) -> Result<String> {
140        let object_path = self
141            .sanitize_object_path(path)
142            .with_context(|| format!("invalid file path: {}", path))?;
143        tracing::debug!("object_path: {}", object_path.to_string());
144        let get_result = self
145            .store
146            .get(&object_path)
147            .await
148            .map_err(|e| {
149                tracing::error!("#{e}");
150                e
151            })
152            .with_context(|| format!("failed to fetch object for {path}"))?;
153        let bytes = get_result
154            .bytes()
155            .await
156            .with_context(|| format!("failed to read bytes for {path}"))?;
157        Self::read_to_string(bytes, path)
158    }
159
160    /// Read file content with line numbers and optional line range
161    pub async fn read_with_line_numbers(
162        &self,
163        path: &str,
164        params: ReadParams,
165    ) -> Result<FileReadResult> {
166        let content = self.read_raw(path).await?;
167
168        let lines: Vec<&str> = content.lines().collect();
169        let total_lines = lines.len() as u64;
170
171        if total_lines == 0 {
172            return Ok(FileReadResult {
173                content: String::new(),
174                start_line: 0,
175                end_line: 0,
176                total_lines: 0,
177            });
178        }
179
180        let start = params.start_line.unwrap_or(1);
181        let end = params.end_line.unwrap_or(total_lines);
182
183        if start > total_lines || end > total_lines || start > end || start == 0 {
184            return Err(anyhow!(
185                "invalid line range: {}-{} for file {} with {} total lines",
186                start,
187                end,
188                path,
189                total_lines
190            ));
191        }
192
193        let start_idx = (start - 1) as usize;
194        let end_idx = end as usize;
195        let selected_lines = &lines[start_idx..end_idx];
196        let content_with_lines = selected_lines
197            .iter()
198            .enumerate()
199            .map(|(i, line)| format!("{:4}→{}", start + i as u64, line))
200            .collect::<Vec<_>>()
201            .join("\n");
202
203        Ok(FileReadResult {
204            content: content_with_lines,
205            start_line: start,
206            end_line: end,
207            total_lines,
208        })
209    }
210
211    /// Read file with optional line range (backward compatibility - uses read_with_line_numbers)
212    pub async fn read(&self, path: &str, params: ReadParams) -> Result<FileReadResult> {
213        self.read_with_line_numbers(path, params).await
214    }
215
216    pub async fn write(&self, path: &str, content: &str) -> Result<()> {
217        let object_path = self
218            .sanitize_object_path(path)
219            .with_context(|| format!("invalid file path: {}", path))?;
220        tracing::debug!("object_path: {}", object_path.to_string());
221        self.store
222            .put(&object_path, Bytes::from(content.to_owned()))
223            .await
224            .map_err(|e| {
225                tracing::error!("#{e}");
226                e
227            })
228            .with_context(|| format!("failed to write file {}", path))?;
229        Ok(())
230    }
231
232    pub async fn write_binary(&self, path: &str, content: &[u8]) -> Result<()> {
233        let object_path = self
234            .sanitize_object_path(path)
235            .with_context(|| format!("invalid binary path: {}", path))?;
236        tracing::debug!("object_path: {}", object_path.to_string());
237        self.store
238            .put(&object_path, Bytes::copy_from_slice(content))
239            .await
240            .with_context(|| format!("failed to write binary file {}", path))?;
241        Ok(())
242    }
243
244    pub async fn read_binary(&self, path: &str) -> Result<Vec<u8>> {
245        let object_path = self
246            .sanitize_object_path(path)
247            .with_context(|| format!("invalid binary path: {}", path))?;
248        let get_result = self
249            .store
250            .get(&object_path)
251            .await
252            .map_err(|e| {
253                tracing::error!("#{e}");
254                e
255            })
256            .with_context(|| format!("failed to fetch binary object for {}", path))?;
257        let bytes = get_result
258            .bytes()
259            .await
260            .map_err(|e| {
261                tracing::error!("#{e}");
262                e
263            })
264            .with_context(|| format!("failed to read binary bytes for {}", path))?;
265        Ok(bytes.to_vec())
266    }
267
268    pub async fn list(&self, path: &str) -> Result<DirectoryListing> {
269        let prefix = self.sanitize_prefix(path)?;
270        let list_result = match &prefix {
271            Some(prefix) => self
272                .store
273                .list_with_delimiter(Some(prefix))
274                .await
275                .map_err(|e| {
276                    tracing::error!("#{e}");
277                    e
278                })
279                .with_context(|| format!("failed to list directory {}", path))?,
280            None => self
281                .store
282                .list_with_delimiter(None)
283                .await
284                .map_err(|e| {
285                    tracing::error!("#{e}");
286                    e
287                })
288                .context("failed to list root directory")?,
289        };
290
291        let mut entries: Vec<DirectoryEntry> = Vec::new();
292
293        for common_prefix in list_result.common_prefixes {
294            if let Some(entry) = Self::build_listing_entry(&prefix, &common_prefix, true, None) {
295                entries.push(entry);
296            }
297        }
298
299        for object in list_result.objects {
300            if let Some(entry) = Self::build_listing_entry(
301                &prefix,
302                &object.location,
303                false,
304                Some(object.size as u64),
305            ) {
306                entries.push(entry);
307            }
308        }
309
310        entries.sort_by(|a, b| a.name.cmp(&b.name));
311        entries.dedup_by(|a, b| a.name == b.name && a.is_dir == b.is_dir);
312
313        Ok(DirectoryListing {
314            path: path.to_string(),
315            entries,
316        })
317    }
318
319    pub async fn delete(&self, path: &str, recursive: bool) -> Result<()> {
320        let object_path = self
321            .sanitize_object_path(path)
322            .with_context(|| format!("invalid delete path: {}", path))?;
323
324        if recursive {
325            let prefix = self
326                .sanitize_prefix(path)?
327                .ok_or_else(|| anyhow!("cannot delete root prefix recursively"))?;
328
329            let mut stream = self.store.list(Some(&prefix));
330            while let Some(meta) = stream.try_next().await? {
331                self.store.delete(&meta.location).await.with_context(|| {
332                    format!(
333                        "failed to delete object {} while deleting directory {}",
334                        meta.location, path
335                    )
336                })?;
337            }
338            return Ok(());
339        }
340
341        match self.store.delete(&object_path).await {
342            Ok(()) => Ok(()),
343            Err(object_store::Error::NotFound { .. }) => {
344                let prefix = self.sanitize_prefix(path)?;
345                if let Some(prefix) = prefix {
346                    let mut stream = self.store.list(Some(&prefix));
347                    if stream.try_next().await?.is_none() {
348                        return Ok(());
349                    }
350                }
351                Err(anyhow!("path {} not found", path))
352            }
353            Err(err) => Err(err).with_context(|| format!("failed to delete path {}", path)),
354        }
355    }
356
357    pub async fn copy(&self, source: &str, destination: &str) -> Result<()> {
358        let content = self
359            .read(source, ReadParams::default())
360            .await
361            .with_context(|| format!("failed to read source file {}", source))?;
362        self.write(destination, &content.content).await
363    }
364
365    pub async fn move_file(&self, source: &str, destination: &str) -> Result<()> {
366        self.copy(source, destination).await?;
367        self.delete(source, false).await
368    }
369
370    pub async fn info(&self, path: &str) -> Result<distri_types::filesystem::FileMetadata> {
371        let object_path = self
372            .sanitize_object_path(path)
373            .with_context(|| format!("invalid info path: {}", path))?;
374        let metadata = self
375            .store
376            .head(&object_path)
377            .await
378            .with_context(|| format!("failed to fetch metadata for {}", path))?;
379
380        let preview_content = self
381            .read(path, ReadParams::default())
382            .await
383            .ok()
384            .map(|result| result.content);
385
386        Ok(distri_types::filesystem::FileMetadata {
387            file_id: uuid::Uuid::new_v4().to_string(),
388            relative_path: path.trim_start_matches('/').to_string(),
389            size: metadata.size as u64,
390            content_type: None,
391            original_filename: path.split('/').last().map(|s| s.to_string()),
392            created_at: metadata.last_modified,
393            updated_at: metadata.last_modified,
394            checksum: metadata.e_tag,
395            stats: None,
396            preview: preview_content,
397        })
398    }
399
400    pub async fn tree(&self, path: &str) -> Result<DirectoryListing> {
401        self.list(path).await
402    }
403}