Skip to main content

secure_exec_vfs_core/engine/engines/
chunked.rs

1use crate::engine::block::BlockStore;
2use crate::engine::error::{VfsError, VfsResult};
3use crate::engine::metadata::MetadataStore;
4use crate::engine::types::{
5    normalize_path, BlockKey, ChunkEdit, ChunkRange, CreateInodeAttrs, Dentry, InodeMeta,
6    InodePatch, InodeType, SnapshotId, Storage, Timespec, VirtualStat, DEFAULT_CHUNK_SIZE,
7    DEFAULT_INLINE_THRESHOLD,
8};
9use crate::engine::vfs::{Snapshottable, VirtualFileSystem};
10use async_trait::async_trait;
11use std::collections::BTreeMap;
12
13#[derive(Debug, Clone)]
14pub struct ChunkedFsOptions {
15    pub inline_threshold: usize,
16    pub chunk_size: u32,
17    pub uid: u32,
18    pub gid: u32,
19    pub file_mode: u32,
20    pub dir_mode: u32,
21}
22
23impl Default for ChunkedFsOptions {
24    fn default() -> Self {
25        Self {
26            inline_threshold: DEFAULT_INLINE_THRESHOLD,
27            chunk_size: DEFAULT_CHUNK_SIZE,
28            uid: 0,
29            gid: 0,
30            file_mode: 0o644,
31            dir_mode: 0o755,
32        }
33    }
34}
35
36#[derive(Debug, Clone)]
37pub struct ChunkedFs<M, B> {
38    metadata: M,
39    blocks: B,
40    options: ChunkedFsOptions,
41}
42
43impl<M, B> ChunkedFs<M, B> {
44    pub fn new(metadata: M, blocks: B) -> Self {
45        Self::with_options(metadata, blocks, ChunkedFsOptions::default())
46    }
47
48    pub fn with_options(metadata: M, blocks: B, options: ChunkedFsOptions) -> Self {
49        Self {
50            metadata,
51            blocks,
52            options,
53        }
54    }
55
56    pub fn metadata(&self) -> &M {
57        &self.metadata
58    }
59
60    pub fn blocks(&self) -> &B {
61        &self.blocks
62    }
63}
64
65impl<M: MetadataStore, B: BlockStore> ChunkedFs<M, B> {
66    async fn write_existing_or_create(&self, path: &str, content: &[u8]) -> VfsResult<()> {
67        let (parent, name) = self.metadata.resolve_parent(path).await?;
68        let existing = self.metadata.lstat(path).await.ok();
69        let ino = match existing {
70            Some(meta) => {
71                if meta.kind == InodeType::Directory {
72                    return Err(VfsError::eisdir(path));
73                }
74                meta.ino
75            }
76            None => {
77                let storage = if content.len() <= self.options.inline_threshold {
78                    Storage::Inline(content.to_vec())
79                } else {
80                    Storage::Chunked {
81                        chunk_size: self.options.chunk_size,
82                    }
83                };
84                let meta = self
85                    .metadata
86                    .create(
87                        parent.ino,
88                        &name,
89                        CreateInodeAttrs::file(
90                            self.options.file_mode,
91                            self.options.uid,
92                            self.options.gid,
93                            storage,
94                        ),
95                    )
96                    .await?;
97                if content.len() <= self.options.inline_threshold {
98                    return Ok(());
99                }
100                meta.ino
101            }
102        };
103
104        if content.len() <= self.options.inline_threshold {
105            let freed = self
106                .metadata
107                .set_attr(
108                    ino,
109                    InodePatch {
110                        storage: Some(Storage::Inline(content.to_vec())),
111                        size: Some(content.len() as u64),
112                        ..InodePatch::default()
113                    },
114                )
115                .await?;
116            self.blocks.delete_many(&freed).await?;
117            return Ok(());
118        }
119
120        let mut edits = Vec::new();
121        for (index, chunk) in content.chunks(self.options.chunk_size as usize).enumerate() {
122            let key = BlockKey::from_content(chunk);
123            if !self.blocks.exists(&key).await? {
124                self.blocks.put(&key, chunk).await?;
125            }
126            edits.push(ChunkEdit {
127                index: index as u64,
128                key,
129                len: chunk.len() as u32,
130            });
131        }
132        self.metadata
133            .set_attr(
134                ino,
135                InodePatch {
136                    storage: Some(Storage::Chunked {
137                        chunk_size: self.options.chunk_size,
138                    }),
139                    size: Some(content.len() as u64),
140                    ..InodePatch::default()
141                },
142            )
143            .await?;
144        let freed = self
145            .metadata
146            .commit_write(ino, edits, content.len() as u64)
147            .await?;
148        self.blocks.delete_many(&freed).await?;
149        Ok(())
150    }
151
152    fn file_chunk_size(&self, storage: &Storage) -> u32 {
153        match storage {
154            Storage::Chunked { chunk_size } => *chunk_size,
155            Storage::Inline(_) | Storage::None => self.options.chunk_size,
156        }
157    }
158
159    fn ensure_file<'a>(&self, path: &str, meta: &'a InodeMeta) -> VfsResult<&'a InodeMeta> {
160        match meta.kind {
161            InodeType::File => Ok(meta),
162            InodeType::Directory => Err(VfsError::eisdir(path)),
163            InodeType::Symlink => Err(VfsError::eopnotsupp("resolved symlink without target file")),
164        }
165    }
166
167    async fn read_file_range(
168        &self,
169        meta: &InodeMeta,
170        offset: u64,
171        length: usize,
172    ) -> VfsResult<Vec<u8>> {
173        if length == 0 || offset >= meta.size {
174            return Ok(Vec::new());
175        }
176        let available = meta.size.saturating_sub(offset).min(length as u64);
177        let output_len = usize::try_from(available)
178            .map_err(|_| VfsError::einval(format!("range length is too large: {available}")))?;
179
180        match &meta.storage {
181            Storage::Inline(data) => {
182                let start = usize::try_from(offset).map_err(|_| {
183                    VfsError::einval(format!("range offset is too large: {offset}"))
184                })?;
185                if start >= data.len() {
186                    return Ok(vec![0; output_len]);
187                }
188                let end = start.saturating_add(output_len).min(data.len());
189                let mut output = vec![0; output_len];
190                output[..end - start].copy_from_slice(&data[start..end]);
191                Ok(output)
192            }
193            Storage::None => Ok(vec![0; output_len]),
194            Storage::Chunked { chunk_size } => {
195                let chunk_size = u64::from(*chunk_size);
196                let end_offset = offset
197                    .checked_add(available)
198                    .ok_or_else(|| VfsError::einval("range end overflows"))?;
199                let start_index = offset / chunk_size;
200                let end_index = end_offset.div_ceil(chunk_size);
201                let chunks = self
202                    .metadata
203                    .get_chunks(
204                        meta.ino,
205                        ChunkRange {
206                            start: start_index,
207                            end: Some(end_index),
208                        },
209                    )
210                    .await?;
211                let mut output = vec![0; output_len];
212                for chunk in chunks {
213                    let chunk_start = chunk.index.saturating_mul(chunk_size);
214                    let block = self.blocks.get(&chunk.key).await?;
215                    let copy_start = offset.max(chunk_start);
216                    let copy_end = end_offset.min(chunk_start.saturating_add(block.len() as u64));
217                    if copy_start >= copy_end {
218                        continue;
219                    }
220                    let output_start = usize::try_from(copy_start - offset)
221                        .map_err(|_| VfsError::einval("range output offset is too large"))?;
222                    let block_start = usize::try_from(copy_start - chunk_start)
223                        .map_err(|_| VfsError::einval("range block offset is too large"))?;
224                    let len = usize::try_from(copy_end - copy_start)
225                        .map_err(|_| VfsError::einval("range copy length is too large"))?;
226                    output[output_start..output_start + len]
227                        .copy_from_slice(&block[block_start..block_start + len]);
228                }
229                Ok(output)
230            }
231        }
232    }
233
234    async fn put_chunk_edit(&self, index: u64, data: Vec<u8>) -> VfsResult<ChunkEdit> {
235        let len = u32::try_from(data.len())
236            .map_err(|_| VfsError::einval(format!("chunk is too large: {}", data.len())))?;
237        let key = BlockKey::from_content(&data);
238        if !self.blocks.exists(&key).await? {
239            self.blocks.put(&key, &data).await?;
240        }
241        Ok(ChunkEdit { index, key, len })
242    }
243
244    async fn write_chunked_range(
245        &self,
246        meta: &InodeMeta,
247        content: &[u8],
248        offset: u64,
249    ) -> VfsResult<u64> {
250        if content.is_empty() {
251            return Ok(meta.size);
252        }
253        let content_len = u64::try_from(content.len()).map_err(|_| {
254            VfsError::einval(format!("pwrite content is too large: {}", content.len()))
255        })?;
256        let end_offset = offset
257            .checked_add(content_len)
258            .ok_or_else(|| VfsError::einval("pwrite end offset overflows"))?;
259        let new_size = meta.size.max(end_offset);
260
261        if !matches!(meta.storage, Storage::Chunked { .. })
262            && usize::try_from(new_size)
263                .ok()
264                .is_some_and(|len| len <= self.options.inline_threshold)
265        {
266            let old_len = usize::try_from(meta.size)
267                .map_err(|_| VfsError::einval(format!("file is too large: {}", meta.size)))?;
268            let mut data = self.read_file_range(meta, 0, old_len).await?;
269            let start = usize::try_from(offset)
270                .map_err(|_| VfsError::einval(format!("pwrite offset is too large: {offset}")))?;
271            let end = start.saturating_add(content.len());
272            if start > data.len() {
273                data.resize(start, 0);
274            }
275            if end > data.len() {
276                data.resize(end, 0);
277            }
278            data[start..end].copy_from_slice(content);
279            let freed = self
280                .metadata
281                .set_attr(
282                    meta.ino,
283                    InodePatch {
284                        storage: Some(Storage::Inline(data)),
285                        size: Some(new_size),
286                        ..InodePatch::default()
287                    },
288                )
289                .await?;
290            self.blocks.delete_many(&freed).await?;
291            return Ok(new_size);
292        }
293
294        let chunk_size = u64::from(self.file_chunk_size(&meta.storage));
295        let start_index = offset / chunk_size;
296        let end_index = end_offset.div_ceil(chunk_size);
297        let existing_chunks = if matches!(meta.storage, Storage::Chunked { .. }) {
298            self.metadata
299                .get_chunks(
300                    meta.ino,
301                    ChunkRange {
302                        start: start_index,
303                        end: Some(end_index),
304                    },
305                )
306                .await?
307                .into_iter()
308                .map(|chunk| (chunk.index, chunk.key))
309                .collect::<BTreeMap<_, _>>()
310        } else {
311            BTreeMap::new()
312        };
313
314        let mut edits = Vec::new();
315        for index in start_index..end_index {
316            let chunk_start = index.saturating_mul(chunk_size);
317            let chunk_len = chunk_size.min(new_size.saturating_sub(chunk_start));
318            let mut chunk_data = vec![
319                0;
320                usize::try_from(chunk_len).map_err(|_| {
321                    VfsError::einval("chunk length is too large")
322                })?
323            ];
324
325            match &meta.storage {
326                Storage::Inline(data) => {
327                    let copy_start = chunk_start.min(data.len() as u64);
328                    let copy_end = chunk_start.saturating_add(chunk_len).min(data.len() as u64);
329                    if copy_start < copy_end {
330                        let dst = usize::try_from(copy_start - chunk_start)
331                            .map_err(|_| VfsError::einval("inline chunk offset is too large"))?;
332                        let src = usize::try_from(copy_start)
333                            .map_err(|_| VfsError::einval("inline source offset is too large"))?;
334                        let len = usize::try_from(copy_end - copy_start)
335                            .map_err(|_| VfsError::einval("inline copy length is too large"))?;
336                        chunk_data[dst..dst + len].copy_from_slice(&data[src..src + len]);
337                    }
338                }
339                Storage::Chunked { .. } => {
340                    if let Some(key) = existing_chunks.get(&index) {
341                        let old = self.blocks.get(key).await?;
342                        let len = old.len().min(chunk_data.len());
343                        chunk_data[..len].copy_from_slice(&old[..len]);
344                    }
345                }
346                Storage::None => {}
347            }
348
349            let write_start = offset.max(chunk_start);
350            let write_end = end_offset.min(chunk_start.saturating_add(chunk_len));
351            if write_start < write_end {
352                let dst = usize::try_from(write_start - chunk_start)
353                    .map_err(|_| VfsError::einval("chunk write offset is too large"))?;
354                let src = usize::try_from(write_start - offset)
355                    .map_err(|_| VfsError::einval("content write offset is too large"))?;
356                let len = usize::try_from(write_end - write_start)
357                    .map_err(|_| VfsError::einval("chunk write length is too large"))?;
358                chunk_data[dst..dst + len].copy_from_slice(&content[src..src + len]);
359            }
360
361            edits.push(self.put_chunk_edit(index, chunk_data).await?);
362        }
363
364        if !matches!(meta.storage, Storage::Chunked { .. }) {
365            self.metadata
366                .set_attr(
367                    meta.ino,
368                    InodePatch {
369                        storage: Some(Storage::Chunked {
370                            chunk_size: self.options.chunk_size,
371                        }),
372                        size: Some(new_size),
373                        ..InodePatch::default()
374                    },
375                )
376                .await?;
377        }
378        let freed = self
379            .metadata
380            .commit_write(meta.ino, edits, new_size)
381            .await?;
382        self.blocks.delete_many(&freed).await?;
383        Ok(new_size)
384    }
385}
386
387#[async_trait]
388impl<M: MetadataStore, B: BlockStore> VirtualFileSystem for ChunkedFs<M, B> {
389    async fn read_file(&self, path: &str) -> VfsResult<Vec<u8>> {
390        let meta = self.metadata.resolve(path).await?;
391        self.ensure_file(path, &meta)?;
392        let len = usize::try_from(meta.size)
393            .map_err(|_| VfsError::einval(format!("file is too large: {}", meta.size)))?;
394        self.read_file_range(&meta, 0, len).await
395    }
396
397    async fn read_dir(&self, path: &str) -> VfsResult<Vec<String>> {
398        let meta = self.metadata.resolve(path).await?;
399        Ok(self
400            .metadata
401            .list_dir(meta.ino)
402            .await?
403            .into_iter()
404            .map(|entry| entry.name)
405            .collect())
406    }
407
408    async fn read_dir_with_types(&self, path: &str) -> VfsResult<Vec<Dentry>> {
409        let meta = self.metadata.resolve(path).await?;
410        Ok(self
411            .metadata
412            .list_dir(meta.ino)
413            .await?
414            .into_iter()
415            .map(|entry| Dentry {
416                name: entry.name,
417                ino: entry.meta.ino,
418                kind: entry.meta.kind,
419            })
420            .collect())
421    }
422
423    async fn write_file(&self, path: &str, content: &[u8]) -> VfsResult<()> {
424        self.write_existing_or_create(path, content).await
425    }
426
427    async fn create_dir(&self, path: &str) -> VfsResult<()> {
428        let (parent, name) = self.metadata.resolve_parent(path).await?;
429        self.metadata
430            .create(
431                parent.ino,
432                &name,
433                CreateInodeAttrs::directory(
434                    self.options.dir_mode,
435                    self.options.uid,
436                    self.options.gid,
437                ),
438            )
439            .await?;
440        Ok(())
441    }
442
443    async fn mkdir(&self, path: &str, recursive: bool) -> VfsResult<()> {
444        if !recursive {
445            return self.create_dir(path).await;
446        }
447        let normalized = normalize_path(path)?;
448        let mut current = String::new();
449        for part in normalized
450            .trim_start_matches('/')
451            .split('/')
452            .filter(|p| !p.is_empty())
453        {
454            current.push('/');
455            current.push_str(part);
456            if !self.exists(&current).await {
457                self.create_dir(&current).await?;
458            }
459        }
460        Ok(())
461    }
462
463    async fn exists(&self, path: &str) -> bool {
464        self.metadata.resolve(path).await.is_ok()
465    }
466
467    async fn stat(&self, path: &str) -> VfsResult<VirtualStat> {
468        Ok(self.metadata.resolve(path).await?.to_stat())
469    }
470
471    async fn lstat(&self, path: &str) -> VfsResult<VirtualStat> {
472        Ok(self.metadata.lstat(path).await?.to_stat())
473    }
474
475    async fn remove_file(&self, path: &str) -> VfsResult<()> {
476        let meta = self.metadata.lstat(path).await?;
477        if meta.kind == InodeType::Directory {
478            return Err(VfsError::eisdir(path));
479        }
480        let (parent, name) = self.metadata.resolve_parent(path).await?;
481        let freed = self.metadata.remove(parent.ino, &name).await?;
482        self.blocks.delete_many(&freed).await
483    }
484
485    async fn remove_dir(&self, path: &str) -> VfsResult<()> {
486        let meta = self.metadata.lstat(path).await?;
487        if meta.kind != InodeType::Directory {
488            return Err(VfsError::enotdir(path));
489        }
490        let (parent, name) = self.metadata.resolve_parent(path).await?;
491        let freed = self.metadata.remove(parent.ino, &name).await?;
492        self.blocks.delete_many(&freed).await
493    }
494
495    async fn rename(&self, old_path: &str, new_path: &str) -> VfsResult<()> {
496        let (src_parent, src) = self.metadata.resolve_parent(old_path).await?;
497        let (dst_parent, dst) = self.metadata.resolve_parent(new_path).await?;
498        let freed = self
499            .metadata
500            .rename(src_parent.ino, &src, dst_parent.ino, &dst)
501            .await?;
502        self.blocks.delete_many(&freed).await
503    }
504
505    async fn realpath(&self, path: &str) -> VfsResult<String> {
506        self.metadata.resolve(path).await?;
507        normalize_path(path)
508    }
509
510    async fn symlink(&self, target: &str, link_path: &str) -> VfsResult<()> {
511        let (parent, name) = self.metadata.resolve_parent(link_path).await?;
512        self.metadata
513            .create(
514                parent.ino,
515                &name,
516                CreateInodeAttrs::symlink(target.to_string(), self.options.uid, self.options.gid),
517            )
518            .await?;
519        Ok(())
520    }
521
522    async fn readlink(&self, path: &str) -> VfsResult<String> {
523        let meta = self.metadata.lstat(path).await?;
524        if meta.kind != InodeType::Symlink {
525            return Err(VfsError::einval(format!("not a symlink: {path}")));
526        }
527        Ok(meta.symlink_target.unwrap_or_default())
528    }
529
530    async fn link(&self, old_path: &str, new_path: &str) -> VfsResult<()> {
531        let target = self.metadata.resolve(old_path).await?;
532        let (parent, name) = self.metadata.resolve_parent(new_path).await?;
533        self.metadata.link(parent.ino, &name, target.ino).await
534    }
535
536    async fn chmod(&self, path: &str, mode: u32) -> VfsResult<()> {
537        let meta = self.metadata.resolve(path).await?;
538        self.metadata
539            .set_attr(
540                meta.ino,
541                InodePatch {
542                    mode: Some(mode),
543                    ..InodePatch::default()
544                },
545            )
546            .await?;
547        Ok(())
548    }
549
550    async fn chown(&self, path: &str, uid: u32, gid: u32) -> VfsResult<()> {
551        let meta = self.metadata.resolve(path).await?;
552        self.metadata
553            .set_attr(
554                meta.ino,
555                InodePatch {
556                    uid: Some(uid),
557                    gid: Some(gid),
558                    ..InodePatch::default()
559                },
560            )
561            .await?;
562        Ok(())
563    }
564
565    async fn utimes(&self, path: &str, atime_ms: u64, mtime_ms: u64) -> VfsResult<()> {
566        let meta = self.metadata.resolve(path).await?;
567        self.metadata
568            .set_attr(
569                meta.ino,
570                InodePatch {
571                    atime: Some(ms_to_timespec(atime_ms)),
572                    mtime: Some(ms_to_timespec(mtime_ms)),
573                    ..InodePatch::default()
574                },
575            )
576            .await?;
577        Ok(())
578    }
579
580    async fn truncate(&self, path: &str, length: u64) -> VfsResult<()> {
581        let meta = self.metadata.resolve(path).await?;
582        self.ensure_file(path, &meta)?;
583        if length == meta.size {
584            return Ok(());
585        }
586
587        if usize::try_from(length)
588            .ok()
589            .is_some_and(|len| len <= self.options.inline_threshold)
590        {
591            let data = self
592                .read_file_range(&meta, 0, usize::try_from(length).unwrap_or(0))
593                .await?;
594            let freed = self
595                .metadata
596                .set_attr(
597                    meta.ino,
598                    InodePatch {
599                        storage: Some(Storage::Inline(data)),
600                        size: Some(length),
601                        ..InodePatch::default()
602                    },
603                )
604                .await?;
605            self.blocks.delete_many(&freed).await?;
606            return Ok(());
607        }
608
609        let chunk_size = u64::from(self.file_chunk_size(&meta.storage));
610        let mut edits = Vec::new();
611        if !matches!(meta.storage, Storage::Chunked { .. }) {
612            let existing_len = meta.size.min(length);
613            let mut offset = 0;
614            while offset < existing_len {
615                let len = (existing_len - offset).min(chunk_size);
616                let data = self
617                    .read_file_range(
618                        &meta,
619                        offset,
620                        usize::try_from(len)
621                            .map_err(|_| VfsError::einval("truncate chunk is too large"))?,
622                    )
623                    .await?;
624                edits.push(self.put_chunk_edit(offset / chunk_size, data).await?);
625                offset = offset.saturating_add(len);
626            }
627            self.metadata
628                .set_attr(
629                    meta.ino,
630                    InodePatch {
631                        storage: Some(Storage::Chunked {
632                            chunk_size: self.options.chunk_size,
633                        }),
634                        size: Some(length),
635                        ..InodePatch::default()
636                    },
637                )
638                .await?;
639        } else if length < meta.size && !length.is_multiple_of(chunk_size) {
640            let final_index = length / chunk_size;
641            let final_start = final_index.saturating_mul(chunk_size);
642            let final_len = length - final_start;
643            let data = self
644                .read_file_range(
645                    &meta,
646                    final_start,
647                    usize::try_from(final_len)
648                        .map_err(|_| VfsError::einval("truncate final chunk is too large"))?,
649                )
650                .await?;
651            edits.push(self.put_chunk_edit(final_index, data).await?);
652        }
653
654        let freed = self.metadata.commit_write(meta.ino, edits, length).await?;
655        self.blocks.delete_many(&freed).await
656    }
657
658    async fn pread(&self, path: &str, offset: u64, length: usize) -> VfsResult<Vec<u8>> {
659        let meta = self.metadata.resolve(path).await?;
660        self.ensure_file(path, &meta)?;
661        self.read_file_range(&meta, offset, length).await
662    }
663
664    async fn pwrite(&self, path: &str, content: &[u8], offset: u64) -> VfsResult<()> {
665        let meta = self.metadata.resolve(path).await?;
666        self.ensure_file(path, &meta)?;
667        self.write_chunked_range(&meta, content, offset).await?;
668        Ok(())
669    }
670
671    async fn append(&self, path: &str, content: &[u8]) -> VfsResult<u64> {
672        let meta = self.metadata.resolve(path).await?;
673        self.ensure_file(path, &meta)?;
674        let len = meta
675            .size
676            .checked_add(u64::try_from(content.len()).map_err(|_| {
677                VfsError::einval(format!("append content is too large: {}", content.len()))
678            })?)
679            .ok_or_else(|| VfsError::einval("append size overflows"))?;
680        self.write_chunked_range(&meta, content, meta.size).await?;
681        Ok(len)
682    }
683}
684
685#[async_trait]
686impl<M: MetadataStore, B: BlockStore> Snapshottable for ChunkedFs<M, B> {
687    async fn snapshot(&self, root: u64) -> VfsResult<SnapshotId> {
688        self.metadata.snapshot(root).await
689    }
690
691    async fn fork(&self, snap: SnapshotId) -> VfsResult<u64> {
692        self.metadata.fork(snap).await
693    }
694}
695
696fn ms_to_timespec(ms: u64) -> Timespec {
697    Timespec {
698        sec: (ms / 1_000) as i64,
699        nsec: ((ms % 1_000) * 1_000_000) as u32,
700    }
701}