1use crate::engine::block::BlockStore;
2use crate::engine::error::{VfsError, VfsResult};
3use crate::engine::metadata::MetadataStore;
4use crate::engine::types::{
5 normalize_path, BlockKey, ChunkEdit, ChunkRange, CreateInodeAttrs, Dentry, InodeMeta,
6 InodePatch, InodeType, SnapshotId, Storage, Timespec, VirtualStat, DEFAULT_CHUNK_SIZE,
7 DEFAULT_INLINE_THRESHOLD,
8};
9use crate::engine::vfs::{Snapshottable, VirtualFileSystem};
10use async_trait::async_trait;
11use std::collections::BTreeMap;
12
13#[derive(Debug, Clone)]
14pub struct ChunkedFsOptions {
15 pub inline_threshold: usize,
16 pub chunk_size: u32,
17 pub uid: u32,
18 pub gid: u32,
19 pub file_mode: u32,
20 pub dir_mode: u32,
21}
22
23impl Default for ChunkedFsOptions {
24 fn default() -> Self {
25 Self {
26 inline_threshold: DEFAULT_INLINE_THRESHOLD,
27 chunk_size: DEFAULT_CHUNK_SIZE,
28 uid: 0,
29 gid: 0,
30 file_mode: 0o644,
31 dir_mode: 0o755,
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
37pub struct ChunkedFs<M, B> {
38 metadata: M,
39 blocks: B,
40 options: ChunkedFsOptions,
41}
42
43impl<M, B> ChunkedFs<M, B> {
44 pub fn new(metadata: M, blocks: B) -> Self {
45 Self::with_options(metadata, blocks, ChunkedFsOptions::default())
46 }
47
48 pub fn with_options(metadata: M, blocks: B, options: ChunkedFsOptions) -> Self {
49 Self {
50 metadata,
51 blocks,
52 options,
53 }
54 }
55
56 pub fn metadata(&self) -> &M {
57 &self.metadata
58 }
59
60 pub fn blocks(&self) -> &B {
61 &self.blocks
62 }
63}
64
65impl<M: MetadataStore, B: BlockStore> ChunkedFs<M, B> {
66 async fn write_existing_or_create(&self, path: &str, content: &[u8]) -> VfsResult<()> {
67 let (parent, name) = self.metadata.resolve_parent(path).await?;
68 let existing = self.metadata.lstat(path).await.ok();
69 let ino = match existing {
70 Some(meta) => {
71 if meta.kind == InodeType::Directory {
72 return Err(VfsError::eisdir(path));
73 }
74 meta.ino
75 }
76 None => {
77 let storage = if content.len() <= self.options.inline_threshold {
78 Storage::Inline(content.to_vec())
79 } else {
80 Storage::Chunked {
81 chunk_size: self.options.chunk_size,
82 }
83 };
84 let meta = self
85 .metadata
86 .create(
87 parent.ino,
88 &name,
89 CreateInodeAttrs::file(
90 self.options.file_mode,
91 self.options.uid,
92 self.options.gid,
93 storage,
94 ),
95 )
96 .await?;
97 if content.len() <= self.options.inline_threshold {
98 return Ok(());
99 }
100 meta.ino
101 }
102 };
103
104 if content.len() <= self.options.inline_threshold {
105 let freed = self
106 .metadata
107 .set_attr(
108 ino,
109 InodePatch {
110 storage: Some(Storage::Inline(content.to_vec())),
111 size: Some(content.len() as u64),
112 ..InodePatch::default()
113 },
114 )
115 .await?;
116 self.blocks.delete_many(&freed).await?;
117 return Ok(());
118 }
119
120 let mut edits = Vec::new();
121 for (index, chunk) in content.chunks(self.options.chunk_size as usize).enumerate() {
122 let key = BlockKey::from_content(chunk);
123 if !self.blocks.exists(&key).await? {
124 self.blocks.put(&key, chunk).await?;
125 }
126 edits.push(ChunkEdit {
127 index: index as u64,
128 key,
129 len: chunk.len() as u32,
130 });
131 }
132 self.metadata
133 .set_attr(
134 ino,
135 InodePatch {
136 storage: Some(Storage::Chunked {
137 chunk_size: self.options.chunk_size,
138 }),
139 size: Some(content.len() as u64),
140 ..InodePatch::default()
141 },
142 )
143 .await?;
144 let freed = self
145 .metadata
146 .commit_write(ino, edits, content.len() as u64)
147 .await?;
148 self.blocks.delete_many(&freed).await?;
149 Ok(())
150 }
151
152 fn file_chunk_size(&self, storage: &Storage) -> u32 {
153 match storage {
154 Storage::Chunked { chunk_size } => *chunk_size,
155 Storage::Inline(_) | Storage::None => self.options.chunk_size,
156 }
157 }
158
159 fn ensure_file<'a>(&self, path: &str, meta: &'a InodeMeta) -> VfsResult<&'a InodeMeta> {
160 match meta.kind {
161 InodeType::File => Ok(meta),
162 InodeType::Directory => Err(VfsError::eisdir(path)),
163 InodeType::Symlink => Err(VfsError::eopnotsupp("resolved symlink without target file")),
164 }
165 }
166
167 async fn read_file_range(
168 &self,
169 meta: &InodeMeta,
170 offset: u64,
171 length: usize,
172 ) -> VfsResult<Vec<u8>> {
173 if length == 0 || offset >= meta.size {
174 return Ok(Vec::new());
175 }
176 let available = meta.size.saturating_sub(offset).min(length as u64);
177 let output_len = usize::try_from(available)
178 .map_err(|_| VfsError::einval(format!("range length is too large: {available}")))?;
179
180 match &meta.storage {
181 Storage::Inline(data) => {
182 let start = usize::try_from(offset).map_err(|_| {
183 VfsError::einval(format!("range offset is too large: {offset}"))
184 })?;
185 if start >= data.len() {
186 return Ok(vec![0; output_len]);
187 }
188 let end = start.saturating_add(output_len).min(data.len());
189 let mut output = vec![0; output_len];
190 output[..end - start].copy_from_slice(&data[start..end]);
191 Ok(output)
192 }
193 Storage::None => Ok(vec![0; output_len]),
194 Storage::Chunked { chunk_size } => {
195 let chunk_size = u64::from(*chunk_size);
196 let end_offset = offset
197 .checked_add(available)
198 .ok_or_else(|| VfsError::einval("range end overflows"))?;
199 let start_index = offset / chunk_size;
200 let end_index = end_offset.div_ceil(chunk_size);
201 let chunks = self
202 .metadata
203 .get_chunks(
204 meta.ino,
205 ChunkRange {
206 start: start_index,
207 end: Some(end_index),
208 },
209 )
210 .await?;
211 let mut output = vec![0; output_len];
212 for chunk in chunks {
213 let chunk_start = chunk.index.saturating_mul(chunk_size);
214 let block = self.blocks.get(&chunk.key).await?;
215 let copy_start = offset.max(chunk_start);
216 let copy_end = end_offset.min(chunk_start.saturating_add(block.len() as u64));
217 if copy_start >= copy_end {
218 continue;
219 }
220 let output_start = usize::try_from(copy_start - offset)
221 .map_err(|_| VfsError::einval("range output offset is too large"))?;
222 let block_start = usize::try_from(copy_start - chunk_start)
223 .map_err(|_| VfsError::einval("range block offset is too large"))?;
224 let len = usize::try_from(copy_end - copy_start)
225 .map_err(|_| VfsError::einval("range copy length is too large"))?;
226 output[output_start..output_start + len]
227 .copy_from_slice(&block[block_start..block_start + len]);
228 }
229 Ok(output)
230 }
231 }
232 }
233
234 async fn put_chunk_edit(&self, index: u64, data: Vec<u8>) -> VfsResult<ChunkEdit> {
235 let len = u32::try_from(data.len())
236 .map_err(|_| VfsError::einval(format!("chunk is too large: {}", data.len())))?;
237 let key = BlockKey::from_content(&data);
238 if !self.blocks.exists(&key).await? {
239 self.blocks.put(&key, &data).await?;
240 }
241 Ok(ChunkEdit { index, key, len })
242 }
243
244 async fn write_chunked_range(
245 &self,
246 meta: &InodeMeta,
247 content: &[u8],
248 offset: u64,
249 ) -> VfsResult<u64> {
250 if content.is_empty() {
251 return Ok(meta.size);
252 }
253 let content_len = u64::try_from(content.len()).map_err(|_| {
254 VfsError::einval(format!("pwrite content is too large: {}", content.len()))
255 })?;
256 let end_offset = offset
257 .checked_add(content_len)
258 .ok_or_else(|| VfsError::einval("pwrite end offset overflows"))?;
259 let new_size = meta.size.max(end_offset);
260
261 if !matches!(meta.storage, Storage::Chunked { .. })
262 && usize::try_from(new_size)
263 .ok()
264 .is_some_and(|len| len <= self.options.inline_threshold)
265 {
266 let old_len = usize::try_from(meta.size)
267 .map_err(|_| VfsError::einval(format!("file is too large: {}", meta.size)))?;
268 let mut data = self.read_file_range(meta, 0, old_len).await?;
269 let start = usize::try_from(offset)
270 .map_err(|_| VfsError::einval(format!("pwrite offset is too large: {offset}")))?;
271 let end = start.saturating_add(content.len());
272 if start > data.len() {
273 data.resize(start, 0);
274 }
275 if end > data.len() {
276 data.resize(end, 0);
277 }
278 data[start..end].copy_from_slice(content);
279 let freed = self
280 .metadata
281 .set_attr(
282 meta.ino,
283 InodePatch {
284 storage: Some(Storage::Inline(data)),
285 size: Some(new_size),
286 ..InodePatch::default()
287 },
288 )
289 .await?;
290 self.blocks.delete_many(&freed).await?;
291 return Ok(new_size);
292 }
293
294 let chunk_size = u64::from(self.file_chunk_size(&meta.storage));
295 let start_index = offset / chunk_size;
296 let end_index = end_offset.div_ceil(chunk_size);
297 let existing_chunks = if matches!(meta.storage, Storage::Chunked { .. }) {
298 self.metadata
299 .get_chunks(
300 meta.ino,
301 ChunkRange {
302 start: start_index,
303 end: Some(end_index),
304 },
305 )
306 .await?
307 .into_iter()
308 .map(|chunk| (chunk.index, chunk.key))
309 .collect::<BTreeMap<_, _>>()
310 } else {
311 BTreeMap::new()
312 };
313
314 let mut edits = Vec::new();
315 for index in start_index..end_index {
316 let chunk_start = index.saturating_mul(chunk_size);
317 let chunk_len = chunk_size.min(new_size.saturating_sub(chunk_start));
318 let mut chunk_data = vec![
319 0;
320 usize::try_from(chunk_len).map_err(|_| {
321 VfsError::einval("chunk length is too large")
322 })?
323 ];
324
325 match &meta.storage {
326 Storage::Inline(data) => {
327 let copy_start = chunk_start.min(data.len() as u64);
328 let copy_end = chunk_start.saturating_add(chunk_len).min(data.len() as u64);
329 if copy_start < copy_end {
330 let dst = usize::try_from(copy_start - chunk_start)
331 .map_err(|_| VfsError::einval("inline chunk offset is too large"))?;
332 let src = usize::try_from(copy_start)
333 .map_err(|_| VfsError::einval("inline source offset is too large"))?;
334 let len = usize::try_from(copy_end - copy_start)
335 .map_err(|_| VfsError::einval("inline copy length is too large"))?;
336 chunk_data[dst..dst + len].copy_from_slice(&data[src..src + len]);
337 }
338 }
339 Storage::Chunked { .. } => {
340 if let Some(key) = existing_chunks.get(&index) {
341 let old = self.blocks.get(key).await?;
342 let len = old.len().min(chunk_data.len());
343 chunk_data[..len].copy_from_slice(&old[..len]);
344 }
345 }
346 Storage::None => {}
347 }
348
349 let write_start = offset.max(chunk_start);
350 let write_end = end_offset.min(chunk_start.saturating_add(chunk_len));
351 if write_start < write_end {
352 let dst = usize::try_from(write_start - chunk_start)
353 .map_err(|_| VfsError::einval("chunk write offset is too large"))?;
354 let src = usize::try_from(write_start - offset)
355 .map_err(|_| VfsError::einval("content write offset is too large"))?;
356 let len = usize::try_from(write_end - write_start)
357 .map_err(|_| VfsError::einval("chunk write length is too large"))?;
358 chunk_data[dst..dst + len].copy_from_slice(&content[src..src + len]);
359 }
360
361 edits.push(self.put_chunk_edit(index, chunk_data).await?);
362 }
363
364 if !matches!(meta.storage, Storage::Chunked { .. }) {
365 self.metadata
366 .set_attr(
367 meta.ino,
368 InodePatch {
369 storage: Some(Storage::Chunked {
370 chunk_size: self.options.chunk_size,
371 }),
372 size: Some(new_size),
373 ..InodePatch::default()
374 },
375 )
376 .await?;
377 }
378 let freed = self
379 .metadata
380 .commit_write(meta.ino, edits, new_size)
381 .await?;
382 self.blocks.delete_many(&freed).await?;
383 Ok(new_size)
384 }
385}
386
387#[async_trait]
388impl<M: MetadataStore, B: BlockStore> VirtualFileSystem for ChunkedFs<M, B> {
389 async fn read_file(&self, path: &str) -> VfsResult<Vec<u8>> {
390 let meta = self.metadata.resolve(path).await?;
391 self.ensure_file(path, &meta)?;
392 let len = usize::try_from(meta.size)
393 .map_err(|_| VfsError::einval(format!("file is too large: {}", meta.size)))?;
394 self.read_file_range(&meta, 0, len).await
395 }
396
397 async fn read_dir(&self, path: &str) -> VfsResult<Vec<String>> {
398 let meta = self.metadata.resolve(path).await?;
399 Ok(self
400 .metadata
401 .list_dir(meta.ino)
402 .await?
403 .into_iter()
404 .map(|entry| entry.name)
405 .collect())
406 }
407
408 async fn read_dir_with_types(&self, path: &str) -> VfsResult<Vec<Dentry>> {
409 let meta = self.metadata.resolve(path).await?;
410 Ok(self
411 .metadata
412 .list_dir(meta.ino)
413 .await?
414 .into_iter()
415 .map(|entry| Dentry {
416 name: entry.name,
417 ino: entry.meta.ino,
418 kind: entry.meta.kind,
419 })
420 .collect())
421 }
422
423 async fn write_file(&self, path: &str, content: &[u8]) -> VfsResult<()> {
424 self.write_existing_or_create(path, content).await
425 }
426
427 async fn create_dir(&self, path: &str) -> VfsResult<()> {
428 let (parent, name) = self.metadata.resolve_parent(path).await?;
429 self.metadata
430 .create(
431 parent.ino,
432 &name,
433 CreateInodeAttrs::directory(
434 self.options.dir_mode,
435 self.options.uid,
436 self.options.gid,
437 ),
438 )
439 .await?;
440 Ok(())
441 }
442
443 async fn mkdir(&self, path: &str, recursive: bool) -> VfsResult<()> {
444 if !recursive {
445 return self.create_dir(path).await;
446 }
447 let normalized = normalize_path(path)?;
448 let mut current = String::new();
449 for part in normalized
450 .trim_start_matches('/')
451 .split('/')
452 .filter(|p| !p.is_empty())
453 {
454 current.push('/');
455 current.push_str(part);
456 if !self.exists(¤t).await {
457 self.create_dir(¤t).await?;
458 }
459 }
460 Ok(())
461 }
462
463 async fn exists(&self, path: &str) -> bool {
464 self.metadata.resolve(path).await.is_ok()
465 }
466
467 async fn stat(&self, path: &str) -> VfsResult<VirtualStat> {
468 Ok(self.metadata.resolve(path).await?.to_stat())
469 }
470
471 async fn lstat(&self, path: &str) -> VfsResult<VirtualStat> {
472 Ok(self.metadata.lstat(path).await?.to_stat())
473 }
474
475 async fn remove_file(&self, path: &str) -> VfsResult<()> {
476 let meta = self.metadata.lstat(path).await?;
477 if meta.kind == InodeType::Directory {
478 return Err(VfsError::eisdir(path));
479 }
480 let (parent, name) = self.metadata.resolve_parent(path).await?;
481 let freed = self.metadata.remove(parent.ino, &name).await?;
482 self.blocks.delete_many(&freed).await
483 }
484
485 async fn remove_dir(&self, path: &str) -> VfsResult<()> {
486 let meta = self.metadata.lstat(path).await?;
487 if meta.kind != InodeType::Directory {
488 return Err(VfsError::enotdir(path));
489 }
490 let (parent, name) = self.metadata.resolve_parent(path).await?;
491 let freed = self.metadata.remove(parent.ino, &name).await?;
492 self.blocks.delete_many(&freed).await
493 }
494
495 async fn rename(&self, old_path: &str, new_path: &str) -> VfsResult<()> {
496 let (src_parent, src) = self.metadata.resolve_parent(old_path).await?;
497 let (dst_parent, dst) = self.metadata.resolve_parent(new_path).await?;
498 let freed = self
499 .metadata
500 .rename(src_parent.ino, &src, dst_parent.ino, &dst)
501 .await?;
502 self.blocks.delete_many(&freed).await
503 }
504
505 async fn realpath(&self, path: &str) -> VfsResult<String> {
506 self.metadata.resolve(path).await?;
507 normalize_path(path)
508 }
509
510 async fn symlink(&self, target: &str, link_path: &str) -> VfsResult<()> {
511 let (parent, name) = self.metadata.resolve_parent(link_path).await?;
512 self.metadata
513 .create(
514 parent.ino,
515 &name,
516 CreateInodeAttrs::symlink(target.to_string(), self.options.uid, self.options.gid),
517 )
518 .await?;
519 Ok(())
520 }
521
522 async fn readlink(&self, path: &str) -> VfsResult<String> {
523 let meta = self.metadata.lstat(path).await?;
524 if meta.kind != InodeType::Symlink {
525 return Err(VfsError::einval(format!("not a symlink: {path}")));
526 }
527 Ok(meta.symlink_target.unwrap_or_default())
528 }
529
530 async fn link(&self, old_path: &str, new_path: &str) -> VfsResult<()> {
531 let target = self.metadata.resolve(old_path).await?;
532 let (parent, name) = self.metadata.resolve_parent(new_path).await?;
533 self.metadata.link(parent.ino, &name, target.ino).await
534 }
535
536 async fn chmod(&self, path: &str, mode: u32) -> VfsResult<()> {
537 let meta = self.metadata.resolve(path).await?;
538 self.metadata
539 .set_attr(
540 meta.ino,
541 InodePatch {
542 mode: Some(mode),
543 ..InodePatch::default()
544 },
545 )
546 .await?;
547 Ok(())
548 }
549
550 async fn chown(&self, path: &str, uid: u32, gid: u32) -> VfsResult<()> {
551 let meta = self.metadata.resolve(path).await?;
552 self.metadata
553 .set_attr(
554 meta.ino,
555 InodePatch {
556 uid: Some(uid),
557 gid: Some(gid),
558 ..InodePatch::default()
559 },
560 )
561 .await?;
562 Ok(())
563 }
564
565 async fn utimes(&self, path: &str, atime_ms: u64, mtime_ms: u64) -> VfsResult<()> {
566 let meta = self.metadata.resolve(path).await?;
567 self.metadata
568 .set_attr(
569 meta.ino,
570 InodePatch {
571 atime: Some(ms_to_timespec(atime_ms)),
572 mtime: Some(ms_to_timespec(mtime_ms)),
573 ..InodePatch::default()
574 },
575 )
576 .await?;
577 Ok(())
578 }
579
580 async fn truncate(&self, path: &str, length: u64) -> VfsResult<()> {
581 let meta = self.metadata.resolve(path).await?;
582 self.ensure_file(path, &meta)?;
583 if length == meta.size {
584 return Ok(());
585 }
586
587 if usize::try_from(length)
588 .ok()
589 .is_some_and(|len| len <= self.options.inline_threshold)
590 {
591 let data = self
592 .read_file_range(&meta, 0, usize::try_from(length).unwrap_or(0))
593 .await?;
594 let freed = self
595 .metadata
596 .set_attr(
597 meta.ino,
598 InodePatch {
599 storage: Some(Storage::Inline(data)),
600 size: Some(length),
601 ..InodePatch::default()
602 },
603 )
604 .await?;
605 self.blocks.delete_many(&freed).await?;
606 return Ok(());
607 }
608
609 let chunk_size = u64::from(self.file_chunk_size(&meta.storage));
610 let mut edits = Vec::new();
611 if !matches!(meta.storage, Storage::Chunked { .. }) {
612 let existing_len = meta.size.min(length);
613 let mut offset = 0;
614 while offset < existing_len {
615 let len = (existing_len - offset).min(chunk_size);
616 let data = self
617 .read_file_range(
618 &meta,
619 offset,
620 usize::try_from(len)
621 .map_err(|_| VfsError::einval("truncate chunk is too large"))?,
622 )
623 .await?;
624 edits.push(self.put_chunk_edit(offset / chunk_size, data).await?);
625 offset = offset.saturating_add(len);
626 }
627 self.metadata
628 .set_attr(
629 meta.ino,
630 InodePatch {
631 storage: Some(Storage::Chunked {
632 chunk_size: self.options.chunk_size,
633 }),
634 size: Some(length),
635 ..InodePatch::default()
636 },
637 )
638 .await?;
639 } else if length < meta.size && !length.is_multiple_of(chunk_size) {
640 let final_index = length / chunk_size;
641 let final_start = final_index.saturating_mul(chunk_size);
642 let final_len = length - final_start;
643 let data = self
644 .read_file_range(
645 &meta,
646 final_start,
647 usize::try_from(final_len)
648 .map_err(|_| VfsError::einval("truncate final chunk is too large"))?,
649 )
650 .await?;
651 edits.push(self.put_chunk_edit(final_index, data).await?);
652 }
653
654 let freed = self.metadata.commit_write(meta.ino, edits, length).await?;
655 self.blocks.delete_many(&freed).await
656 }
657
658 async fn pread(&self, path: &str, offset: u64, length: usize) -> VfsResult<Vec<u8>> {
659 let meta = self.metadata.resolve(path).await?;
660 self.ensure_file(path, &meta)?;
661 self.read_file_range(&meta, offset, length).await
662 }
663
664 async fn pwrite(&self, path: &str, content: &[u8], offset: u64) -> VfsResult<()> {
665 let meta = self.metadata.resolve(path).await?;
666 self.ensure_file(path, &meta)?;
667 self.write_chunked_range(&meta, content, offset).await?;
668 Ok(())
669 }
670
671 async fn append(&self, path: &str, content: &[u8]) -> VfsResult<u64> {
672 let meta = self.metadata.resolve(path).await?;
673 self.ensure_file(path, &meta)?;
674 let len = meta
675 .size
676 .checked_add(u64::try_from(content.len()).map_err(|_| {
677 VfsError::einval(format!("append content is too large: {}", content.len()))
678 })?)
679 .ok_or_else(|| VfsError::einval("append size overflows"))?;
680 self.write_chunked_range(&meta, content, meta.size).await?;
681 Ok(len)
682 }
683}
684
685#[async_trait]
686impl<M: MetadataStore, B: BlockStore> Snapshottable for ChunkedFs<M, B> {
687 async fn snapshot(&self, root: u64) -> VfsResult<SnapshotId> {
688 self.metadata.snapshot(root).await
689 }
690
691 async fn fork(&self, snap: SnapshotId) -> VfsResult<u64> {
692 self.metadata.fork(snap).await
693 }
694}
695
696fn ms_to_timespec(ms: u64) -> Timespec {
697 Timespec {
698 sec: (ms / 1_000) as i64,
699 nsec: ((ms % 1_000) * 1_000_000) as u32,
700 }
701}