mod backend;
pub mod cas;
mod nbd;
pub use backend::FlatFileBackend;
pub use cas::{CasBackend, ChunkIndex, ChunkStore, LocalChunkStore};
pub use nbd::NbdBackend;
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use anyhow::{Context, Result};
use tracing::{debug, info, warn};
pub struct NbdHandle {
socket_path: String,
shutdown: Option<std::sync::mpsc::Sender<()>>,
thread: Option<std::thread::JoinHandle<()>>,
cas_backend: Option<Arc<CasBackend>>,
active_fd: Arc<AtomicI32>,
}
impl NbdHandle {
pub fn uri(&self) -> String {
format!("nbd+unix:///export?socket={}", self.socket_path)
}
pub fn save_checkpoint(&self, index_path: &str) -> Result<()> {
let backend = self.cas_backend.as_ref()
.ok_or_else(|| anyhow::anyhow!("save_checkpoint requires CAS backend"))?;
backend.save_index(index_path)
}
}
impl Drop for NbdHandle {
fn drop(&mut self) {
if let Some(ref backend) = self.cas_backend {
let _ = backend.flush();
}
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
let fd = self.active_fd.load(Ordering::Acquire);
if fd >= 0 {
unsafe { libc::shutdown(fd, libc::SHUT_RDWR); }
}
let _ = std::os::unix::net::UnixStream::connect(&self.socket_path);
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
let _ = std::fs::remove_file(&self.socket_path);
}
}
fn start_nbd_with_backend(
backend: Arc<dyn NbdBackend>,
socket_path: &str,
cas_backend: Option<Arc<CasBackend>>,
) -> Result<NbdHandle> {
let _ = std::fs::remove_file(socket_path);
let listener = UnixListener::bind(socket_path)
.with_context(|| format!("failed to bind NBD socket: {}", socket_path))?;
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel::<()>();
let socket_path_owned = socket_path.to_string();
let active_fd = Arc::new(AtomicI32::new(-1));
let active_fd_thread = active_fd.clone();
let thread = std::thread::Builder::new()
.name("shuru-nbd".into())
.spawn(move || {
info!("NBD server listening on {}", socket_path_owned);
loop {
match listener.accept() {
Ok((stream, _)) => {
if shutdown_rx.try_recv().is_ok() {
debug!("NBD server shutting down");
break;
}
use std::os::unix::io::AsRawFd;
let fd = stream.as_raw_fd();
active_fd_thread.store(fd, Ordering::Release);
info!("NBD client connected");
if let Err(e) = nbd::handle_client(stream, backend.clone()) {
warn!("NBD client session ended: {}", e);
}
active_fd_thread.store(-1, Ordering::Release);
debug!("NBD client disconnected, waiting for reconnect...");
}
Err(e) => {
if shutdown_rx.try_recv().is_ok() {
break;
}
warn!("NBD accept error: {}", e);
}
}
}
info!("NBD server stopped");
})?;
Ok(NbdHandle {
socket_path: socket_path.to_string(),
shutdown: Some(shutdown_tx),
thread: Some(thread),
cas_backend,
active_fd,
})
}
pub fn start_cas_nbd_server(
rootfs_path: &str,
cas_dir: &str,
index_path: &str,
socket_path: &str,
disk_size: u64,
) -> Result<NbdHandle> {
let store: Box<dyn ChunkStore> = Box::new(LocalChunkStore::open(cas_dir)?);
let (index, fallback, source_idx) = if Path::new(index_path).exists() {
info!("loading CAS index from {}", index_path);
let idx = ChunkIndex::load(index_path)?;
let fb = idx.fallback_path.as_ref().and_then(|p| {
FlatFileBackend::open(p).ok()
});
(idx, fb, Some(index_path.to_string()))
} else {
let fb = FlatFileBackend::open(rootfs_path)
.with_context(|| format!("failed to open rootfs for lazy ingestion: {}", rootfs_path))?;
let disk_size = fb.size();
info!("CAS: lazy mode, {} MB rootfs", disk_size / (1024 * 1024));
(ChunkIndex::new(disk_size), Some(fb), None)
};
let mut backend = if let Some(fb) = fallback {
CasBackend::with_fallback(store, index, fb)
} else {
CasBackend::new(store, index)
};
backend.source_index_path = source_idx;
if disk_size > 0 {
backend.set_disk_size(disk_size);
}
let cas = Arc::new(backend);
start_nbd_with_backend(cas.clone(), socket_path, Some(cas))
}