1use crate::{DirectoryEntry, DirectoryListing, FileReadResult, FileSystemConfig, ReadParams};
2use anyhow::{anyhow, Context, Result};
3use bytes::Bytes;
4use futures::TryStreamExt;
5use object_store::path::Path;
6use object_store::ObjectStore;
7use std::sync::Arc;
8
9#[derive(Debug)]
11pub struct FileSystemStore {
12 store: Arc<dyn ObjectStore>,
13 root: Option<Path>,
14}
15
16impl FileSystemStore {
17 pub async fn new(config: FileSystemConfig) -> Result<Self> {
19 let store = crate::object_store::build_object_store(&config.object_store)?;
20
21 let root = config.root_prefix.as_ref().and_then(|prefix| {
22 let trimmed = prefix.trim_matches('/');
23 if trimmed.is_empty() {
24 None
25 } else {
26 Some(Path::from(trimmed))
27 }
28 });
29
30 Ok(Self { store, root })
31 }
32
33 pub fn root_prefix(&self) -> Option<String> {
34 self.root.as_ref().map(|p| p.to_string())
35 }
36
37 pub fn scoped(&self, prefix: Option<&str>) -> Result<Self> {
38 let new_root = match prefix {
39 Some(extra) => Self::combine_prefixes(&self.root, extra)?,
40 None => self.root.clone(),
41 };
42 Ok(Self {
43 store: self.store.clone(),
44 root: new_root,
45 })
46 }
47
48 fn ensure_safe_path(path: &str) -> Result<&str> {
49 if path.split('/').any(|segment| segment == "..") {
50 return Err(anyhow!("path segments must not contain '..'"));
51 }
52 Ok(path)
53 }
54
55 fn combine_prefixes(base: &Option<Path>, extra: &str) -> Result<Option<Path>> {
56 let trimmed = extra.trim_matches('/');
57 if trimmed.is_empty() {
58 return Ok(base.clone());
59 }
60 Self::ensure_safe_path(trimmed)?;
61 let combined = match base {
62 Some(existing) => {
63 let mut prefix = existing.to_string();
64 if !prefix.ends_with('/') {
65 prefix.push('/');
66 }
67 prefix.push_str(trimmed);
68 Path::from(prefix)
69 }
70 None => Path::from(trimmed),
71 };
72 Ok(Some(combined))
73 }
74
75 fn sanitize_object_path(&self, path: &str) -> Result<Path> {
76 let trimmed = path.trim_matches('/');
77 if trimmed.is_empty() {
78 return Err(anyhow!("path cannot be empty"));
79 }
80 Self::ensure_safe_path(trimmed)?;
81 let normalized = match &self.root {
82 Some(root) if !root.as_ref().is_empty() => {
83 Path::from(format!("{}/{}", root, trimmed))
85 }
86 _ => Path::from(trimmed),
87 };
88 Ok(normalized)
89 }
90
91 fn sanitize_prefix(&self, path: &str) -> Result<Option<Path>> {
92 let trimmed = path.trim_matches('/');
93 if trimmed.is_empty() {
94 return Ok(self.root.clone());
95 }
96 Self::ensure_safe_path(trimmed)?;
97 let prefix = match &self.root {
98 Some(root) if !root.as_ref().is_empty() => {
99 Path::from(format!("{}/{}", root, trimmed))
101 }
102 _ => Path::from(trimmed),
103 };
104 Ok(Some(prefix))
105 }
106
107 fn prefix_depth(prefix: &Option<Path>) -> usize {
108 prefix.as_ref().map(|p| p.parts().count()).unwrap_or(0)
109 }
110
111 fn entry_name(prefix: &Option<Path>, entry: &Path) -> Option<String> {
112 let depth = Self::prefix_depth(prefix);
113 entry
114 .parts()
115 .nth(depth)
116 .map(|component| component.as_ref().to_string())
117 }
118
119 fn build_listing_entry(
120 prefix: &Option<Path>,
121 entry: &Path,
122 is_dir: bool,
123 size: Option<u64>,
124 ) -> Option<DirectoryEntry> {
125 Self::entry_name(prefix, entry).map(|name| DirectoryEntry {
126 name,
127 is_file: !is_dir,
128 is_dir,
129 size,
130 })
131 }
132
133 fn read_to_string(bytes: Bytes, path: &str) -> Result<String> {
134 String::from_utf8(bytes.to_vec())
135 .with_context(|| format!("failed to decode file {} as utf-8", path))
136 }
137
138 pub async fn read_raw(&self, path: &str) -> Result<String> {
140 let object_path = self
141 .sanitize_object_path(path)
142 .with_context(|| format!("invalid file path: {}", path))?;
143 tracing::debug!("object_path: {}", object_path.to_string());
144 let get_result = self
145 .store
146 .get(&object_path)
147 .await
148 .map_err(|e| {
149 tracing::error!("#{e}");
150 e
151 })
152 .with_context(|| format!("failed to fetch object for {path}"))?;
153 let bytes = get_result
154 .bytes()
155 .await
156 .with_context(|| format!("failed to read bytes for {path}"))?;
157 Self::read_to_string(bytes, path)
158 }
159
160 pub async fn read_with_line_numbers(
162 &self,
163 path: &str,
164 params: ReadParams,
165 ) -> Result<FileReadResult> {
166 let content = self.read_raw(path).await?;
167
168 let lines: Vec<&str> = content.lines().collect();
169 let total_lines = lines.len() as u64;
170
171 if total_lines == 0 {
172 return Ok(FileReadResult {
173 content: String::new(),
174 start_line: 0,
175 end_line: 0,
176 total_lines: 0,
177 });
178 }
179
180 let start = params.start_line.unwrap_or(1);
181 let end = params.end_line.unwrap_or(total_lines);
182
183 if start > total_lines || end > total_lines || start > end || start == 0 {
184 return Err(anyhow!(
185 "invalid line range: {}-{} for file {} with {} total lines",
186 start,
187 end,
188 path,
189 total_lines
190 ));
191 }
192
193 let start_idx = (start - 1) as usize;
194 let end_idx = end as usize;
195 let selected_lines = &lines[start_idx..end_idx];
196 let content_with_lines = selected_lines
197 .iter()
198 .enumerate()
199 .map(|(i, line)| format!("{:4}→{}", start + i as u64, line))
200 .collect::<Vec<_>>()
201 .join("\n");
202
203 Ok(FileReadResult {
204 content: content_with_lines,
205 start_line: start,
206 end_line: end,
207 total_lines,
208 })
209 }
210
211 pub async fn read(&self, path: &str, params: ReadParams) -> Result<FileReadResult> {
213 self.read_with_line_numbers(path, params).await
214 }
215
216 pub async fn write(&self, path: &str, content: &str) -> Result<()> {
217 let object_path = self
218 .sanitize_object_path(path)
219 .with_context(|| format!("invalid file path: {}", path))?;
220 tracing::debug!("object_path: {}", object_path.to_string());
221 self.store
222 .put(&object_path, Bytes::from(content.to_owned()))
223 .await
224 .map_err(|e| {
225 tracing::error!("#{e}");
226 e
227 })
228 .with_context(|| format!("failed to write file {}", path))?;
229 Ok(())
230 }
231
232 pub async fn write_binary(&self, path: &str, content: &[u8]) -> Result<()> {
233 let object_path = self
234 .sanitize_object_path(path)
235 .with_context(|| format!("invalid binary path: {}", path))?;
236 tracing::debug!("object_path: {}", object_path.to_string());
237 self.store
238 .put(&object_path, Bytes::copy_from_slice(content))
239 .await
240 .with_context(|| format!("failed to write binary file {}", path))?;
241 Ok(())
242 }
243
244 pub async fn read_binary(&self, path: &str) -> Result<Vec<u8>> {
245 let object_path = self
246 .sanitize_object_path(path)
247 .with_context(|| format!("invalid binary path: {}", path))?;
248 let get_result = self
249 .store
250 .get(&object_path)
251 .await
252 .map_err(|e| {
253 tracing::error!("#{e}");
254 e
255 })
256 .with_context(|| format!("failed to fetch binary object for {}", path))?;
257 let bytes = get_result
258 .bytes()
259 .await
260 .map_err(|e| {
261 tracing::error!("#{e}");
262 e
263 })
264 .with_context(|| format!("failed to read binary bytes for {}", path))?;
265 Ok(bytes.to_vec())
266 }
267
268 pub async fn list(&self, path: &str) -> Result<DirectoryListing> {
269 let prefix = self.sanitize_prefix(path)?;
270 let list_result = match &prefix {
271 Some(prefix) => self
272 .store
273 .list_with_delimiter(Some(prefix))
274 .await
275 .map_err(|e| {
276 tracing::error!("#{e}");
277 e
278 })
279 .with_context(|| format!("failed to list directory {}", path))?,
280 None => self
281 .store
282 .list_with_delimiter(None)
283 .await
284 .map_err(|e| {
285 tracing::error!("#{e}");
286 e
287 })
288 .context("failed to list root directory")?,
289 };
290
291 let mut entries: Vec<DirectoryEntry> = Vec::new();
292
293 for common_prefix in list_result.common_prefixes {
294 if let Some(entry) = Self::build_listing_entry(&prefix, &common_prefix, true, None) {
295 entries.push(entry);
296 }
297 }
298
299 for object in list_result.objects {
300 if let Some(entry) = Self::build_listing_entry(
301 &prefix,
302 &object.location,
303 false,
304 Some(object.size as u64),
305 ) {
306 entries.push(entry);
307 }
308 }
309
310 entries.sort_by(|a, b| a.name.cmp(&b.name));
311 entries.dedup_by(|a, b| a.name == b.name && a.is_dir == b.is_dir);
312
313 Ok(DirectoryListing {
314 path: path.to_string(),
315 entries,
316 })
317 }
318
319 pub async fn delete(&self, path: &str, recursive: bool) -> Result<()> {
320 let object_path = self
321 .sanitize_object_path(path)
322 .with_context(|| format!("invalid delete path: {}", path))?;
323
324 if recursive {
325 let prefix = self
326 .sanitize_prefix(path)?
327 .ok_or_else(|| anyhow!("cannot delete root prefix recursively"))?;
328
329 let mut stream = self.store.list(Some(&prefix));
330 while let Some(meta) = stream.try_next().await? {
331 self.store.delete(&meta.location).await.with_context(|| {
332 format!(
333 "failed to delete object {} while deleting directory {}",
334 meta.location, path
335 )
336 })?;
337 }
338 return Ok(());
339 }
340
341 match self.store.delete(&object_path).await {
342 Ok(()) => Ok(()),
343 Err(object_store::Error::NotFound { .. }) => {
344 let prefix = self.sanitize_prefix(path)?;
345 if let Some(prefix) = prefix {
346 let mut stream = self.store.list(Some(&prefix));
347 if stream.try_next().await?.is_none() {
348 return Ok(());
349 }
350 }
351 Err(anyhow!("path {} not found", path))
352 }
353 Err(err) => Err(err).with_context(|| format!("failed to delete path {}", path)),
354 }
355 }
356
357 pub async fn copy(&self, source: &str, destination: &str) -> Result<()> {
358 let content = self
359 .read(source, ReadParams::default())
360 .await
361 .with_context(|| format!("failed to read source file {}", source))?;
362 self.write(destination, &content.content).await
363 }
364
365 pub async fn move_file(&self, source: &str, destination: &str) -> Result<()> {
366 self.copy(source, destination).await?;
367 self.delete(source, false).await
368 }
369
370 pub async fn info(&self, path: &str) -> Result<distri_types::filesystem::FileMetadata> {
371 let object_path = self
372 .sanitize_object_path(path)
373 .with_context(|| format!("invalid info path: {}", path))?;
374 let metadata = self
375 .store
376 .head(&object_path)
377 .await
378 .with_context(|| format!("failed to fetch metadata for {}", path))?;
379
380 let preview_content = self
381 .read(path, ReadParams::default())
382 .await
383 .ok()
384 .map(|result| result.content);
385
386 Ok(distri_types::filesystem::FileMetadata {
387 file_id: uuid::Uuid::new_v4().to_string(),
388 relative_path: path.trim_start_matches('/').to_string(),
389 size: metadata.size as u64,
390 content_type: None,
391 original_filename: path.split('/').last().map(|s| s.to_string()),
392 created_at: metadata.last_modified,
393 updated_at: metadata.last_modified,
394 checksum: metadata.e_tag,
395 stats: None,
396 preview: preview_content,
397 })
398 }
399
400 pub async fn tree(&self, path: &str) -> Result<DirectoryListing> {
401 self.list(path).await
402 }
403}