use std::sync::{Arc, Mutex, MutexGuard};
use crate::block::BlockDevice;
use crate::fs::ext::Ext;
use crate::fs::{DirEntry, FileMeta};
#[derive(Clone)]
pub struct SharedExt {
inner: Arc<Mutex<Inner>>,
}
struct Inner {
ext: Ext,
dev: Box<dyn BlockDevice>,
}
impl SharedExt {
pub fn new(ext: Ext, dev: Box<dyn BlockDevice>) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner { ext, dev })),
}
}
pub fn with_inner<F, T>(&self, f: F) -> crate::Result<T>
where
F: FnOnce(&mut Ext, &mut dyn BlockDevice) -> crate::Result<T>,
{
let mut guard: MutexGuard<'_, Inner> = self
.inner
.lock()
.map_err(|_| crate::Error::Unsupported("SharedExt: mutex poisoned".into()))?;
let inner = &mut *guard;
f(&mut inner.ext, inner.dev.as_mut())
}
pub fn read_inode(&self, ino: u32) -> crate::Result<crate::fs::ext::inode::Inode> {
self.with_inner(|ext, dev| ext.read_inode(dev, ino))
}
pub fn list_inode(&self, ino: u32) -> crate::Result<Vec<DirEntry>> {
self.with_inner(|ext, dev| ext.list_inode(dev, ino))
}
pub fn path_to_inode(&self, path: &str) -> crate::Result<u32> {
self.with_inner(|ext, dev| ext.path_to_inode(dev, path))
}
pub fn add_file_to_streaming(
&self,
parent_ino: u32,
name: &[u8],
reader: &mut dyn std::io::Read,
len: u64,
meta: FileMeta,
) -> crate::Result<u32> {
self.with_inner(|ext, dev| {
ext.add_file_to_streaming(dev, parent_ino, name, reader, len, meta)
})
}
pub fn flush(&self) -> crate::Result<()> {
self.with_inner(|ext, dev| ext.flush(dev))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block::MemoryBackend;
use crate::fs::ext::{FormatOpts, FsKind};
use std::sync::Arc;
use std::thread;
#[test]
fn shared_ext_takes_concurrent_traffic() {
let opts = FormatOpts {
kind: FsKind::Ext4,
block_size: 1024,
blocks_count: 4096,
inodes_count: 256,
journal_blocks: 1024,
..FormatOpts::default()
};
let total = opts.blocks_count as u64 * opts.block_size as u64;
let mut dev = MemoryBackend::new(total);
let ext = Ext::format_with(&mut dev, &opts).unwrap();
let shared = SharedExt::new(ext, Box::new(dev));
let n_writers = 8usize;
let per_writer = 5u32;
let mut handles = Vec::new();
for w in 0..n_writers {
let s = shared.clone();
handles.push(thread::spawn(move || -> crate::Result<()> {
for i in 0..per_writer {
let name = format!("w{w}_f{i}");
let body = format!("writer {w} file {i}\n");
let mut reader = std::io::Cursor::new(body.into_bytes());
let len = reader.get_ref().len() as u64;
s.add_file_to_streaming(
2,
name.as_bytes(),
&mut reader,
len,
FileMeta::with_mode(0o644),
)?;
}
Ok(())
}));
}
for _ in 0..4 {
let s = shared.clone();
handles.push(thread::spawn(move || -> crate::Result<()> {
for _ in 0..20 {
let _ = s.list_inode(2)?;
}
Ok(())
}));
}
for h in handles {
h.join().unwrap().unwrap();
}
let entries = shared.list_inode(2).unwrap();
let names: std::collections::HashSet<String> =
entries.iter().map(|e| e.name.clone()).collect();
for w in 0..n_writers {
for i in 0..per_writer {
let expected = format!("w{w}_f{i}");
assert!(
names.contains(&expected),
"writer {w} file {i} missing from final listing: {names:?}"
);
}
}
for e in &entries {
let inode = shared.read_inode(e.inode).unwrap();
assert_ne!(inode.mode, 0, "inode {} has zero mode", e.inode);
}
}
#[allow(dead_code)]
fn assert_send_sync<T: Send + Sync>() {}
#[allow(dead_code)]
fn _check_shared_ext_send_sync() {
assert_send_sync::<SharedExt>();
assert_send_sync::<Arc<SharedExt>>();
}
}