use alloc::collections::BTreeMap;
use alloc::format;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
use super::osd::OsdError;
use crate::FsError;
const DISTRIBUTED_OBJECT_BASE: u64 = 0x1000_0000_0000_0000;
#[allow(dead_code)]
const XATTR_NAMESPACE: &str = "lcpfs.osd";
const MAX_OBJECT_SIZE: u64 = 1024 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct ObjectMeta {
pub oid: u64,
pub pgid: u64,
pub size: u64,
pub version: u64,
pub checksum: [u8; 32],
pub ctime: u64,
pub mtime: u64,
pub xattrs: BTreeMap<String, Vec<u8>>,
}
impl ObjectMeta {
pub fn new(oid: u64, pgid: u64, size: u64, checksum: [u8; 32]) -> Self {
let now = get_timestamp();
Self {
oid,
pgid,
size,
version: 1,
checksum,
ctime: now,
mtime: now,
xattrs: BTreeMap::new(),
}
}
pub fn update(&mut self, size: u64, checksum: [u8; 32]) {
self.size = size;
self.checksum = checksum;
self.version += 1;
self.mtime = get_timestamp();
}
}
fn get_timestamp() -> u64 {
static COUNTER: core::sync::atomic::AtomicU64 = core::sync::atomic::AtomicU64::new(1704067200); COUNTER.fetch_add(1, core::sync::atomic::Ordering::Relaxed)
}
pub struct DmuObjectStore {
osd_id: u64,
pgid: u64,
metadata: BTreeMap<u64, ObjectMeta>,
data: BTreeMap<u64, Vec<u8>>,
stats: DmuStoreStats,
}
#[derive(Debug, Clone, Default)]
pub struct DmuStoreStats {
pub object_count: u64,
pub total_bytes: u64,
pub reads: u64,
pub writes: u64,
pub deletes: u64,
pub checksum_failures: u64,
}
impl DmuObjectStore {
pub fn new(osd_id: u64, pgid: u64) -> Self {
Self {
osd_id,
pgid,
metadata: BTreeMap::new(),
data: BTreeMap::new(),
stats: DmuStoreStats::default(),
}
}
pub fn osd_id(&self) -> u64 {
self.osd_id
}
pub fn pgid(&self) -> u64 {
self.pgid
}
fn to_dmu_oid(&self, oid: u64) -> u64 {
DISTRIBUTED_OBJECT_BASE + ((self.pgid & 0xFFFFFFFF) << 32) + (oid & 0xFFFFFFFF)
}
pub fn write(&mut self, oid: u64, data: Vec<u8>) -> Result<u64, OsdError> {
if data.len() as u64 > MAX_OBJECT_SIZE {
return Err(OsdError::IoError("Object too large".to_string()));
}
let checksum = compute_checksum(&data);
let size = data.len() as u64;
let version = if let Some(meta) = self.metadata.get_mut(&oid) {
let old_size = meta.size;
meta.update(size, checksum);
self.stats.total_bytes = self.stats.total_bytes.saturating_sub(old_size) + size;
meta.version
} else {
let meta = ObjectMeta::new(oid, self.pgid, size, checksum);
let version = meta.version;
self.metadata.insert(oid, meta);
self.stats.object_count += 1;
self.stats.total_bytes += size;
version
};
self.data.insert(oid, data);
self.stats.writes += 1;
Ok(version)
}
pub fn write_at(&mut self, oid: u64, offset: u64, data: &[u8]) -> Result<u64, OsdError> {
let existing = self.data.get(&oid).cloned();
let new_size = core::cmp::max(
offset + data.len() as u64,
existing.as_ref().map(|d| d.len() as u64).unwrap_or(0),
);
if new_size > MAX_OBJECT_SIZE {
return Err(OsdError::IoError("Object too large".to_string()));
}
let mut full_data = vec![0u8; new_size as usize];
if let Some(ref existing_data) = existing {
full_data[..existing_data.len()].copy_from_slice(existing_data);
}
full_data[offset as usize..offset as usize + data.len()].copy_from_slice(data);
self.write(oid, full_data)
}
pub fn read(&mut self, oid: u64, offset: u64, length: u64) -> Result<Vec<u8>, OsdError> {
let stored = self.data.get(&oid).ok_or(OsdError::ObjectNotFound(oid))?;
let offset = offset as usize;
let end = core::cmp::min(offset + length as usize, stored.len());
if offset >= stored.len() {
return Ok(Vec::new());
}
let data = stored[offset..end].to_vec();
self.stats.reads += 1;
if offset == 0 {
if let Some(meta) = self.metadata.get(&oid) {
if data.len() as u64 >= meta.size {
let computed = compute_checksum(&data[..meta.size as usize]);
if computed != meta.checksum {
self.stats.checksum_failures += 1;
return Err(OsdError::ChecksumMismatch {
oid,
expected: meta.checksum,
got: computed,
});
}
}
}
}
Ok(data)
}
pub fn read_full(&mut self, oid: u64) -> Result<Vec<u8>, OsdError> {
let meta = self
.metadata
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
let size = meta.size;
self.read(oid, 0, size)
}
pub fn delete(&mut self, oid: u64) -> Result<(), OsdError> {
let meta = self
.metadata
.remove(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
self.data.remove(&oid);
self.stats.object_count = self.stats.object_count.saturating_sub(1);
self.stats.total_bytes = self.stats.total_bytes.saturating_sub(meta.size);
self.stats.deletes += 1;
Ok(())
}
pub fn truncate(&mut self, oid: u64, new_size: u64) -> Result<(), OsdError> {
let meta = self
.metadata
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?
.clone();
if new_size >= meta.size {
return Ok(());
}
let data = self.read(oid, 0, new_size)?;
self.write(oid, data)?;
Ok(())
}
pub fn stat(&self, oid: u64) -> Result<ObjectMeta, OsdError> {
self.metadata
.get(&oid)
.cloned()
.ok_or(OsdError::ObjectNotFound(oid))
}
pub fn exists(&self, oid: u64) -> bool {
self.metadata.contains_key(&oid)
}
pub fn set_xattr(&mut self, oid: u64, name: &str, value: Vec<u8>) -> Result<(), OsdError> {
let meta = self
.metadata
.get_mut(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
meta.xattrs.insert(name.to_string(), value);
meta.mtime = get_timestamp();
Ok(())
}
pub fn get_xattr(&self, oid: u64, name: &str) -> Result<Vec<u8>, OsdError> {
let meta = self
.metadata
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
meta.xattrs
.get(name)
.cloned()
.ok_or_else(|| OsdError::IoError(format!("xattr '{}' not found", name)))
}
pub fn remove_xattr(&mut self, oid: u64, name: &str) -> Result<(), OsdError> {
let meta = self
.metadata
.get_mut(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
if meta.xattrs.remove(name).is_none() {
return Err(OsdError::IoError(format!("xattr '{}' not found", name)));
}
meta.mtime = get_timestamp();
Ok(())
}
pub fn list_objects(&self) -> Vec<u64> {
self.metadata.keys().copied().collect()
}
pub fn get_stats(&self) -> &DmuStoreStats {
&self.stats
}
pub fn sync(&self) -> Result<(), OsdError> {
Ok(())
}
pub fn scrub(&mut self) -> Result<ScrubResult, OsdError> {
let mut result = ScrubResult::default();
let oids: Vec<u64> = self.metadata.keys().copied().collect();
for oid in oids {
result.objects_scanned += 1;
if let Some(meta) = self.metadata.get(&oid) {
let expected_checksum = meta.checksum;
let size = meta.size;
match self.read(oid, 0, size) {
Ok(data) => {
let actual = compute_checksum(&data);
if actual != expected_checksum {
result.checksum_errors += 1;
result.error_oids.push(oid);
}
result.bytes_scanned += data.len() as u64;
}
Err(_) => {
result.read_errors += 1;
result.error_oids.push(oid);
}
}
}
}
Ok(result)
}
}
#[derive(Debug, Clone, Default)]
pub struct ScrubResult {
pub objects_scanned: u64,
pub bytes_scanned: u64,
pub checksum_errors: u64,
pub read_errors: u64,
pub error_oids: Vec<u64>,
}
#[derive(Debug, Clone)]
pub struct ReplicationRequest {
pub source_osd: u64,
pub target_osd: u64,
pub pgid: u64,
pub oid: u64,
pub version: u64,
pub data: Vec<u8>,
pub checksum: [u8; 32],
}
impl ReplicationRequest {
pub fn from_object(
source_osd: u64,
target_osd: u64,
pgid: u64,
oid: u64,
data: Vec<u8>,
) -> Self {
let checksum = compute_checksum(&data);
Self {
source_osd,
target_osd,
pgid,
oid,
version: 1,
data,
checksum,
}
}
pub fn verify(&self) -> bool {
compute_checksum(&self.data) == self.checksum
}
}
#[derive(Debug, Clone)]
pub struct ReplicationAck {
pub osd_id: u64,
pub oid: u64,
pub version: u64,
pub success: bool,
pub error: Option<String>,
}
lazy_static! {
static ref STORE_REGISTRY: Mutex<BTreeMap<(u64, u64), DmuObjectStore>> =
Mutex::new(BTreeMap::new());
}
pub fn get_or_create_store(
osd_id: u64,
pgid: u64,
) -> &'static Mutex<BTreeMap<(u64, u64), DmuObjectStore>> {
{
let mut registry = STORE_REGISTRY.lock();
registry
.entry((osd_id, pgid))
.or_insert_with(|| DmuObjectStore::new(osd_id, pgid));
}
&STORE_REGISTRY
}
pub fn write_object(osd_id: u64, pgid: u64, oid: u64, data: Vec<u8>) -> Result<u64, OsdError> {
let mut registry = STORE_REGISTRY.lock();
let store = registry
.entry((osd_id, pgid))
.or_insert_with(|| DmuObjectStore::new(osd_id, pgid));
store.write(oid, data)
}
pub fn read_object(osd_id: u64, pgid: u64, oid: u64) -> Result<Vec<u8>, OsdError> {
let mut registry = STORE_REGISTRY.lock();
let store = registry
.entry((osd_id, pgid))
.or_insert_with(|| DmuObjectStore::new(osd_id, pgid));
store.read_full(oid)
}
pub fn delete_object(osd_id: u64, pgid: u64, oid: u64) -> Result<(), OsdError> {
let mut registry = STORE_REGISTRY.lock();
if let Some(store) = registry.get_mut(&(osd_id, pgid)) {
store.delete(oid)
} else {
Err(OsdError::PgNotFound(pgid))
}
}
pub fn get_osd_stats(osd_id: u64) -> DmuStoreStats {
let registry = STORE_REGISTRY.lock();
let mut total = DmuStoreStats::default();
for ((id, _), store) in registry.iter() {
if *id == osd_id {
let stats = store.get_stats();
total.object_count += stats.object_count;
total.total_bytes += stats.total_bytes;
total.reads += stats.reads;
total.writes += stats.writes;
total.deletes += stats.deletes;
total.checksum_failures += stats.checksum_failures;
}
}
total
}
fn compute_checksum(data: &[u8]) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(data);
*hasher.finalize().as_bytes()
}
#[allow(dead_code)]
fn convert_fs_error(err: FsError) -> OsdError {
match err {
FsError::NotFound => OsdError::ObjectNotFound(0),
FsError::DiskFull { .. } => OsdError::IoError("Disk full".to_string()),
FsError::IoError { reason, .. } => OsdError::IoError(reason.to_string()),
FsError::PermissionDenied => OsdError::IoError("Permission denied".to_string()),
_ => OsdError::IoError("Storage error".to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dmu_store_creation() {
let store = DmuObjectStore::new(1, 100);
assert_eq!(store.osd_id, 1);
assert_eq!(store.pgid, 100);
assert_eq!(store.stats.object_count, 0);
}
#[test]
fn test_dmu_oid_encoding() {
let store = DmuObjectStore::new(1, 0x1234);
let dmu_oid = store.to_dmu_oid(0x5678);
assert!(dmu_oid >= DISTRIBUTED_OBJECT_BASE);
assert_eq!(dmu_oid & 0xFFFFFFFF, 0x5678);
let without_base_and_oid = dmu_oid - DISTRIBUTED_OBJECT_BASE - 0x5678;
assert_eq!((without_base_and_oid >> 32) & 0xFFFFFFFF, 0x1234);
}
#[test]
fn test_object_metadata() {
let checksum = [0u8; 32];
let meta = ObjectMeta::new(100, 1, 1024, checksum);
assert_eq!(meta.oid, 100);
assert_eq!(meta.pgid, 1);
assert_eq!(meta.size, 1024);
assert_eq!(meta.version, 1);
}
#[test]
fn test_metadata_update() {
let checksum = [0u8; 32];
let mut meta = ObjectMeta::new(100, 1, 1024, checksum);
let new_checksum = [1u8; 32];
meta.update(2048, new_checksum);
assert_eq!(meta.size, 2048);
assert_eq!(meta.version, 2);
assert_eq!(meta.checksum, new_checksum);
}
#[test]
fn test_replication_request() {
let data = vec![1, 2, 3, 4, 5];
let req = ReplicationRequest::from_object(1, 2, 100, 50, data.clone());
assert_eq!(req.source_osd, 1);
assert_eq!(req.target_osd, 2);
assert_eq!(req.pgid, 100);
assert_eq!(req.oid, 50);
assert!(req.verify());
}
#[test]
fn test_replication_verify_fails_on_corruption() {
let data = vec![1, 2, 3, 4, 5];
let mut req = ReplicationRequest::from_object(1, 2, 100, 50, data);
req.data[0] = 99;
assert!(!req.verify());
}
#[test]
fn test_scrub_result_default() {
let result = ScrubResult::default();
assert_eq!(result.objects_scanned, 0);
assert_eq!(result.checksum_errors, 0);
assert!(result.error_oids.is_empty());
}
#[test]
fn test_checksum_computation() {
let data1 = b"hello world";
let data2 = b"hello world";
let data3 = b"different";
let sum1 = compute_checksum(data1);
let sum2 = compute_checksum(data2);
let sum3 = compute_checksum(data3);
assert_eq!(sum1, sum2);
assert_ne!(sum1, sum3);
}
#[test]
fn test_store_stats_default() {
let stats = DmuStoreStats::default();
assert_eq!(stats.object_count, 0);
assert_eq!(stats.total_bytes, 0);
assert_eq!(stats.reads, 0);
assert_eq!(stats.writes, 0);
}
}