use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use super::DEFRAG_TASKS;
use super::extents::{
calculate_fragmentation, find_contiguous_space, get_extents, reserve_contiguous_space,
};
use super::types::{
DefragError, DefragOptions, DefragProgress, DefragStats, DefragStatus, DefragTask,
};
use crate::fscore::structs::Dva;
use crate::storage::dmu::get_block_size;
use crate::storage::zpl::ZPL;
use crate::util::alloc::ALLOCATOR;
static NEXT_TASK_ID: core::sync::atomic::AtomicU64 = core::sync::atomic::AtomicU64::new(1);
pub fn defrag_file(
dataset: &str,
object_id: u64,
options: &DefragOptions,
) -> Result<DefragStats, DefragError> {
let mut stats = DefragStats::default();
let block_size = get_block_size();
let extents = get_extents(dataset, object_id)?;
if extents.is_empty() {
return Ok(stats);
}
stats.extents_before = extents.len() as u64;
let fragmentation = calculate_fragmentation(&extents);
if fragmentation < options.min_fragmentation {
stats.files_skipped = 1;
stats.extents_after = stats.extents_before;
return Ok(stats);
}
let total_size: u64 = extents.iter().map(|e| e.length).sum();
if total_size < options.min_file_size {
stats.files_skipped = 1;
stats.extents_after = stats.extents_before;
return Ok(stats);
}
if options.max_file_size > 0 && total_size > options.max_file_size {
stats.files_skipped = 1;
stats.extents_after = stats.extents_before;
return Ok(stats);
}
if options.dry_run {
stats.files_processed = 1;
stats.files_defragmented = 1;
stats.bytes_moved = total_size;
stats.extents_after = 1;
return Ok(stats);
}
if options.skip_open_files {
let zpl = ZPL.lock();
if let Some(znode) = zpl.get_znode(object_id) {
if znode.dirty {
stats.files_skipped = 1;
stats.extents_after = stats.extents_before;
return Ok(stats);
}
}
drop(zpl);
}
let (target_device, target_offset) = reserve_contiguous_space(dataset, total_size)?;
match defrag_file_cow(
object_id,
&extents,
target_device,
target_offset,
total_size,
) {
Ok(()) => {
stats.files_processed = 1;
stats.files_defragmented = 1;
stats.bytes_moved = total_size;
stats.extents_after = 1;
crate::lcpfs_println!(
"[ DEFRAG ] Defragmented object {} ({} bytes, {} -> 1 extents)",
object_id,
total_size,
extents.len()
);
}
Err(e) => {
let mut allocator = ALLOCATOR.lock();
allocator.free(
Dva {
vdev: target_device,
offset: target_offset,
},
total_size,
);
return Err(e);
}
}
Ok(stats)
}
fn defrag_file_cow(
object_id: u64,
extents: &[super::types::Extent],
target_device: u32,
target_offset: u64,
total_size: u64,
) -> Result<(), DefragError> {
let file_data: Vec<u8>;
{
let zpl = ZPL.lock();
if let Some(znode) = zpl.get_znode(object_id) {
if let Some(ref cache) = znode.data_cache {
file_data = cache.clone();
} else {
let mut data = Vec::with_capacity(total_size as usize);
for extent in extents {
let buf = vec![0u8; extent.length as usize];
data.extend_from_slice(&buf);
}
file_data = data;
}
} else {
return Err(DefragError::ObjectNotFound(object_id));
}
}
let mut final_data = file_data;
if final_data.len() < total_size as usize {
final_data.resize(total_size as usize, 0);
}
{
let mut zpl = ZPL.lock();
if let Some(znode) = zpl.get_znode_mut(object_id) {
znode.master_node = Dva {
vdev: target_device,
offset: target_offset,
};
znode.data_cache = Some(final_data);
znode.dirty = true;
} else {
return Err(DefragError::ObjectNotFound(object_id));
}
}
let mut allocator = ALLOCATOR.lock();
for extent in extents {
if extent.pstart != 0 && extent.pstart != target_offset {
allocator.free(
Dva {
vdev: extent.device_id,
offset: extent.pstart,
},
extent.length,
);
}
}
Ok(())
}
pub fn defrag_dataset(dataset: &str, options: &DefragOptions) -> Result<DefragStats, DefragError> {
let mut total_stats = DefragStats::default();
let start_time = crate::time::monotonic();
crate::lcpfs_println!("[ DEFRAG ] Starting dataset defragmentation: {}", dataset);
let mut file_candidates: Vec<(u64, u8, u64)> = Vec::new();
{
let zpl = ZPL.lock();
for object_id in 1..1000 {
if let Some(znode) = zpl.get_znode(object_id) {
if !znode.is_file() {
continue;
}
let file_size = znode.phys.size;
if file_size < options.min_file_size {
continue;
}
if options.max_file_size > 0 && file_size > options.max_file_size {
continue;
}
if options.skip_open_files && znode.dirty {
continue;
}
file_candidates.push((object_id, 0, file_size));
}
}
}
for (object_id, frag_score, _) in file_candidates.iter_mut() {
if let Ok(extents) = get_extents(dataset, *object_id) {
*frag_score = calculate_fragmentation(&extents);
}
}
file_candidates.retain(|(_, frag, _)| *frag >= options.min_fragmentation);
file_candidates.sort_by(|a, b| b.1.cmp(&a.1));
let max_files = if options.max_files > 0 {
options.max_files
} else {
file_candidates.len()
};
crate::lcpfs_println!(
"[ DEFRAG ] Found {} candidates, processing up to {}",
file_candidates.len(),
max_files
);
for (i, (object_id, frag_score, file_size)) in
file_candidates.iter().take(max_files).enumerate()
{
if options.background {
}
crate::lcpfs_println!(
"[ DEFRAG ] Processing {}/{}: object {} (frag={}, size={})",
i + 1,
max_files.min(file_candidates.len()),
object_id,
frag_score,
file_size
);
match defrag_file(dataset, *object_id, options) {
Ok(stats) => {
total_stats.files_processed += stats.files_processed;
total_stats.files_defragmented += stats.files_defragmented;
total_stats.files_skipped += stats.files_skipped;
total_stats.bytes_moved += stats.bytes_moved;
total_stats.extents_before += stats.extents_before;
total_stats.extents_after += stats.extents_after;
}
Err(e) => {
crate::lcpfs_println!("[ DEFRAG ] Failed to defrag object {}: {:?}", object_id, e);
total_stats.files_skipped += 1;
}
}
if options.background && options.throttle_ms > 0 {
}
}
let end_time = crate::time::monotonic();
total_stats.duration_ms = (end_time - start_time) * 10;
crate::lcpfs_println!(
"[ DEFRAG ] Complete: {} files, {} defragmented, {} bytes moved, {}ms",
total_stats.files_processed,
total_stats.files_defragmented,
total_stats.bytes_moved,
total_stats.duration_ms
);
Ok(total_stats)
}
pub fn start_background_defrag(dataset: &str, options: DefragOptions) -> Result<u64, DefragError> {
let task_id = NEXT_TASK_ID.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
let task = DefragTask::new(task_id, dataset.to_string(), options);
{
let mut tasks = DEFRAG_TASKS.lock();
tasks.push(task);
}
Ok(task_id)
}
pub fn cancel_defrag_task(task_id: u64) -> Result<(), DefragError> {
let mut tasks = DEFRAG_TASKS.lock();
for task in tasks.iter_mut() {
if task.id == task_id
&& (task.status == DefragStatus::Running || task.status == DefragStatus::Pending)
{
task.cancel_requested = true;
task.status = DefragStatus::Cancelled;
return Ok(());
}
}
Err(DefragError::TaskNotFound(task_id))
}
pub fn get_defrag_progress(task_id: u64) -> Result<DefragProgress, DefragError> {
let tasks = DEFRAG_TASKS.lock();
for task in tasks.iter() {
if task.id == task_id {
return Ok(DefragProgress {
task_id,
status: task.status.clone(),
files_processed: task.stats.files_processed,
files_total: 0, bytes_moved: task.stats.bytes_moved,
current_file: task.current_file.clone(),
error: task.error.clone(),
});
}
}
Err(DefragError::TaskNotFound(task_id))
}
fn run_defrag_task(task_id: u64) -> Result<(), DefragError> {
let (dataset, options) = {
let mut tasks = DEFRAG_TASKS.lock();
let task = tasks
.iter_mut()
.find(|t| t.id == task_id)
.ok_or(DefragError::TaskNotFound(task_id))?;
if task.cancel_requested {
return Err(DefragError::Cancelled);
}
task.status = DefragStatus::Running;
(task.dataset.clone(), task.options.clone())
};
match defrag_dataset(&dataset, &options) {
Ok(stats) => {
let mut tasks = DEFRAG_TASKS.lock();
if let Some(task) = tasks.iter_mut().find(|t| t.id == task_id) {
task.stats = stats;
task.status = DefragStatus::Completed;
}
Ok(())
}
Err(e) => {
let mut tasks = DEFRAG_TASKS.lock();
if let Some(task) = tasks.iter_mut().find(|t| t.id == task_id) {
task.error = Some(alloc::format!("{}", e));
task.status = DefragStatus::Failed;
}
Err(e)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_start_background() {
let task_id = start_background_defrag("test", DefragOptions::default()).unwrap();
assert!(task_id > 0);
}
#[test]
fn test_cancel_task() {
let task_id = start_background_defrag("test", DefragOptions::default()).unwrap();
let result = cancel_defrag_task(task_id);
assert!(result.is_ok());
}
#[test]
fn test_get_progress() {
let task_id = start_background_defrag("test", DefragOptions::default()).unwrap();
let progress = get_defrag_progress(task_id).unwrap();
assert_eq!(progress.task_id, task_id);
}
}