#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use core::fmt;
#[cfg(feature = "std")]
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[cfg(not(feature = "std"))]
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[derive(Debug, Clone)]
pub struct OsdConfig {
pub id: u64,
pub cluster_name: String,
pub data_path: String,
pub journal_path: Option<String>,
pub max_ops: usize,
pub heartbeat_interval_ms: u64,
pub replication_timeout_ms: u64,
pub scrub_enabled: bool,
pub scrub_interval_secs: u64,
}
impl Default for OsdConfig {
fn default() -> Self {
Self {
id: 0,
cluster_name: "lcpfs".to_string(),
data_path: "/var/lib/lcpfs/osd.0".to_string(),
journal_path: None,
max_ops: 128,
heartbeat_interval_ms: 1000,
replication_timeout_ms: 30000,
scrub_enabled: true,
scrub_interval_secs: 86400, }
}
}
#[derive(Debug, Clone)]
pub enum OsdOp {
Read {
oid: u64,
offset: u64,
length: u64,
},
Write {
oid: u64,
offset: u64,
data: Vec<u8>,
},
WriteFull {
oid: u64,
data: Vec<u8>,
},
Delete {
oid: u64,
},
Truncate {
oid: u64,
size: u64,
},
Stat {
oid: u64,
},
SetXattr {
oid: u64,
name: String,
value: Vec<u8>,
},
GetXattr {
oid: u64,
name: String,
},
RemoveXattr {
oid: u64,
name: String,
},
List {
max: usize,
marker: Option<u64>,
},
Replicate {
oid: u64,
target_osd: u64,
},
Pull {
oid: u64,
source_osd: u64,
},
Scrub {
oid: Option<u64>,
},
}
#[derive(Debug, Clone)]
pub struct OsdMessage {
pub epoch: u64,
pub pgid: u64,
pub op: OsdOp,
pub request_id: u64,
pub source_osd: Option<u64>,
pub flags: OsdOpFlags,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct OsdOpFlags {
pub ack_on_primary: bool,
pub skip_cache: bool,
pub recovery: bool,
pub scrub: bool,
pub ignore_state: bool,
}
#[derive(Debug, Clone)]
pub struct OsdResponse {
pub request_id: u64,
pub result: OsdResult,
pub epoch: u64,
}
#[derive(Debug, Clone)]
pub enum OsdResult {
Success(Option<Vec<u8>>),
Stat(ObjectStat),
List(Vec<ObjectInfo>),
Xattr(Vec<u8>),
Error(OsdError),
}
#[derive(Debug, Clone)]
pub struct ObjectStat {
pub oid: u64,
pub size: u64,
pub mtime: u64,
pub version: u64,
pub checksum: [u8; 32],
}
#[derive(Debug, Clone)]
pub struct ObjectInfo {
pub oid: u64,
pub size: u64,
pub version: u64,
}
#[derive(Debug, Clone)]
pub enum OsdError {
ObjectNotFound(u64),
PgNotFound(u64),
PgNotActive(u64, PgState),
NotPrimary(u64),
StaleEpoch {
expected: u64,
got: u64,
},
IoError(String),
ReplicationFailed(String),
Timeout,
OsdFull,
InvalidOp(String),
ChecksumMismatch {
oid: u64,
expected: [u8; 32],
got: [u8; 32],
},
}
impl fmt::Display for OsdError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OsdError::ObjectNotFound(oid) => write!(f, "Object {} not found", oid),
OsdError::PgNotFound(pgid) => write!(f, "PG {} not found", pgid),
OsdError::PgNotActive(pgid, state) => write!(f, "PG {} is {:?}", pgid, state),
OsdError::NotPrimary(pgid) => write!(f, "Not primary for PG {}", pgid),
OsdError::StaleEpoch { expected, got } => {
write!(f, "Stale epoch: expected {}, got {}", expected, got)
}
OsdError::IoError(msg) => write!(f, "I/O error: {}", msg),
OsdError::ReplicationFailed(msg) => write!(f, "Replication failed: {}", msg),
OsdError::Timeout => write!(f, "Operation timeout"),
OsdError::OsdFull => write!(f, "OSD is full"),
OsdError::InvalidOp(msg) => write!(f, "Invalid operation: {}", msg),
OsdError::ChecksumMismatch { oid, .. } => {
write!(f, "Checksum mismatch for object {}", oid)
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PgState {
Active,
Degraded,
Recovering,
Backfilling,
Peering,
Inactive,
Creating,
Incomplete,
Stale,
Remapped,
Inconsistent,
}
impl PgState {
pub fn can_read(&self) -> bool {
matches!(
self,
PgState::Active | PgState::Degraded | PgState::Recovering | PgState::Backfilling
)
}
pub fn can_write(&self) -> bool {
matches!(
self,
PgState::Active | PgState::Degraded | PgState::Recovering
)
}
pub fn is_healthy(&self) -> bool {
matches!(self, PgState::Active)
}
}
#[derive(Debug, Clone)]
pub struct PlacementGroup {
pub pgid: u64,
pub pool_id: u64,
pub state: PgState,
pub primary: u64,
pub acting: Vec<u64>,
pub up: Vec<u64>,
pub epoch: u64,
pub last_peered_epoch: u64,
pub object_count: u64,
pub total_bytes: u64,
pub last_scrub: u64,
pub last_deep_scrub: u64,
}
impl PlacementGroup {
pub fn new(pgid: u64, pool_id: u64, primary: u64, replicas: Vec<u64>) -> Self {
let mut acting = vec![primary];
acting.extend(replicas.iter());
let up = acting.clone();
Self {
pgid,
pool_id,
state: PgState::Creating,
primary,
acting,
up,
epoch: 0,
last_peered_epoch: 0,
object_count: 0,
total_bytes: 0,
last_scrub: 0,
last_deep_scrub: 0,
}
}
pub fn is_primary(&self, osd_id: u64) -> bool {
self.primary == osd_id
}
pub fn replicas(&self) -> Vec<u64> {
self.acting.iter().skip(1).copied().collect()
}
}
#[derive(Debug, Clone)]
pub struct LocalObject {
pub oid: u64,
pub data: Vec<u8>,
pub version: u64,
pub mtime: u64,
pub xattrs: BTreeMap<String, Vec<u8>>,
pub checksum: [u8; 32],
}
impl LocalObject {
pub fn new(oid: u64, data: Vec<u8>) -> Self {
let checksum = Self::compute_checksum(&data);
Self {
oid,
data,
version: 1,
mtime: 0,
xattrs: BTreeMap::new(),
checksum,
}
}
pub fn compute_checksum(data: &[u8]) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(data);
*hasher.finalize().as_bytes()
}
pub fn verify(&self) -> bool {
self.checksum == Self::compute_checksum(&self.data)
}
pub fn size(&self) -> u64 {
self.data.len() as u64
}
}
#[derive(Debug)]
pub struct LocalStore {
objects: BTreeMap<u64, LocalObject>,
total_bytes: u64,
object_count: u64,
}
impl LocalStore {
pub fn new() -> Self {
Self {
objects: BTreeMap::new(),
total_bytes: 0,
object_count: 0,
}
}
pub fn write(&mut self, oid: u64, data: Vec<u8>) -> Result<u64, OsdError> {
let size = data.len() as u64;
if let Some(existing) = self.objects.get_mut(&oid) {
self.total_bytes -= existing.data.len() as u64;
existing.data = data;
existing.version += 1;
existing.checksum = LocalObject::compute_checksum(&existing.data);
self.total_bytes += size;
Ok(existing.version)
} else {
let obj = LocalObject::new(oid, data);
let version = obj.version;
self.objects.insert(oid, obj);
self.total_bytes += size;
self.object_count += 1;
Ok(version)
}
}
pub fn read(&self, oid: u64, offset: u64, length: u64) -> Result<Vec<u8>, OsdError> {
let obj = self
.objects
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
let start = offset as usize;
let end = core::cmp::min(start + length as usize, obj.data.len());
if start >= obj.data.len() {
return Ok(Vec::new());
}
Ok(obj.data[start..end].to_vec())
}
pub fn read_full(&self, oid: u64) -> Result<Vec<u8>, OsdError> {
let obj = self
.objects
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
Ok(obj.data.clone())
}
pub fn delete(&mut self, oid: u64) -> Result<(), OsdError> {
if let Some(obj) = self.objects.remove(&oid) {
self.total_bytes -= obj.data.len() as u64;
self.object_count -= 1;
Ok(())
} else {
Err(OsdError::ObjectNotFound(oid))
}
}
pub fn stat(&self, oid: u64) -> Result<ObjectStat, OsdError> {
let obj = self
.objects
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
Ok(ObjectStat {
oid: obj.oid,
size: obj.data.len() as u64,
mtime: obj.mtime,
version: obj.version,
checksum: obj.checksum,
})
}
pub fn set_xattr(&mut self, oid: u64, name: &str, value: Vec<u8>) -> Result<(), OsdError> {
let obj = self
.objects
.get_mut(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
obj.xattrs.insert(name.to_string(), value);
Ok(())
}
pub fn get_xattr(&self, oid: u64, name: &str) -> Result<Vec<u8>, OsdError> {
let obj = self
.objects
.get(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
obj.xattrs
.get(name)
.cloned()
.ok_or(OsdError::ObjectNotFound(oid))
}
pub fn remove_xattr(&mut self, oid: u64, name: &str) -> Result<(), OsdError> {
let obj = self
.objects
.get_mut(&oid)
.ok_or(OsdError::ObjectNotFound(oid))?;
obj.xattrs.remove(name);
Ok(())
}
pub fn list(&self, max: usize, marker: Option<u64>) -> Vec<ObjectInfo> {
let iter: Box<dyn Iterator<Item = _>> = if let Some(m) = marker {
Box::new(
self.objects
.range((core::ops::Bound::Excluded(m), core::ops::Bound::Unbounded)),
)
} else {
Box::new(self.objects.iter())
};
iter.take(max)
.map(|(&oid, obj)| ObjectInfo {
oid,
size: obj.data.len() as u64,
version: obj.version,
})
.collect()
}
pub fn total_bytes(&self) -> u64 {
self.total_bytes
}
pub fn object_count(&self) -> u64 {
self.object_count
}
pub fn verify_all(&self) -> Vec<u64> {
let mut corrupt = Vec::new();
for (&oid, obj) in &self.objects {
if !obj.verify() {
corrupt.push(oid);
}
}
corrupt
}
}
impl Default for LocalStore {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OsdState {
Booting,
Active,
Stopping,
Stopped,
Error,
}
#[derive(Debug, Clone, Default)]
pub struct OsdStats {
pub ops_total: u64,
pub ops_read: u64,
pub ops_write: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub ops_in_progress: u64,
pub replication_sent: u64,
pub replication_received: u64,
pub recovery_ops: u64,
pub scrub_ops: u64,
}
pub struct Osd {
config: OsdConfig,
id: u64,
state: OsdState,
epoch: AtomicU64,
pgs: BTreeMap<u64, PlacementGroup>,
stores: BTreeMap<u64, LocalStore>,
stats: OsdStats,
is_primary: AtomicBool,
pending_acks: BTreeMap<u64, PendingReplication>,
}
#[derive(Debug)]
struct PendingReplication {
request_id: u64,
pgid: u64,
oid: u64,
targets: Vec<u64>,
acks: Vec<u64>,
required: usize,
timestamp: u64,
}
impl Osd {
pub fn new(config: OsdConfig) -> Self {
let id = config.id;
Self {
config,
id,
state: OsdState::Booting,
epoch: AtomicU64::new(0),
pgs: BTreeMap::new(),
stores: BTreeMap::new(),
stats: OsdStats::default(),
is_primary: AtomicBool::new(false),
pending_acks: BTreeMap::new(),
}
}
pub fn id(&self) -> u64 {
self.id
}
pub fn state(&self) -> OsdState {
self.state
}
pub fn epoch(&self) -> u64 {
self.epoch.load(Ordering::SeqCst)
}
pub fn set_epoch(&self, epoch: u64) {
self.epoch.store(epoch, Ordering::SeqCst);
}
pub fn stats(&self) -> &OsdStats {
&self.stats
}
pub fn boot(&mut self) -> Result<(), OsdError> {
self.state = OsdState::Active;
Ok(())
}
pub fn shutdown(&mut self) {
self.state = OsdState::Stopping;
self.state = OsdState::Stopped;
}
pub fn add_pg(&mut self, pg: PlacementGroup) {
let pgid = pg.pgid;
if pg.is_primary(self.id) {
self.is_primary.store(true, Ordering::SeqCst);
}
self.pgs.insert(pgid, pg);
self.stores.entry(pgid).or_default();
}
pub fn remove_pg(&mut self, pgid: u64) {
self.pgs.remove(&pgid);
self.stores.remove(&pgid);
let still_primary = self.pgs.values().any(|pg| pg.is_primary(self.id));
self.is_primary.store(still_primary, Ordering::SeqCst);
}
pub fn get_pg(&self, pgid: u64) -> Option<&PlacementGroup> {
self.pgs.get(&pgid)
}
pub fn get_pg_mut(&mut self, pgid: u64) -> Option<&mut PlacementGroup> {
self.pgs.get_mut(&pgid)
}
pub fn handle_op(&mut self, msg: OsdMessage) -> OsdResponse {
let request_id = msg.request_id;
let current_epoch = self.epoch();
if msg.epoch < current_epoch && !msg.flags.ignore_state {
return OsdResponse {
request_id,
result: OsdResult::Error(OsdError::StaleEpoch {
expected: current_epoch,
got: msg.epoch,
}),
epoch: current_epoch,
};
}
let pg = match self.pgs.get(&msg.pgid) {
Some(pg) => pg,
None => {
return OsdResponse {
request_id,
result: OsdResult::Error(OsdError::PgNotFound(msg.pgid)),
epoch: current_epoch,
};
}
};
let is_write = matches!(
msg.op,
OsdOp::Write { .. }
| OsdOp::WriteFull { .. }
| OsdOp::Delete { .. }
| OsdOp::Truncate { .. }
);
if is_write && !pg.state.can_write() && !msg.flags.ignore_state {
return OsdResponse {
request_id,
result: OsdResult::Error(OsdError::PgNotActive(msg.pgid, pg.state)),
epoch: current_epoch,
};
}
if !is_write && !pg.state.can_read() && !msg.flags.ignore_state {
return OsdResponse {
request_id,
result: OsdResult::Error(OsdError::PgNotActive(msg.pgid, pg.state)),
epoch: current_epoch,
};
}
let result = self.execute_op(msg.pgid, msg.op, msg.flags);
self.stats.ops_total += 1;
OsdResponse {
request_id,
result,
epoch: current_epoch,
}
}
fn execute_op(&mut self, pgid: u64, op: OsdOp, _flags: OsdOpFlags) -> OsdResult {
let store = match self.stores.get_mut(&pgid) {
Some(s) => s,
None => return OsdResult::Error(OsdError::PgNotFound(pgid)),
};
match op {
OsdOp::Read {
oid,
offset,
length,
} => {
self.stats.ops_read += 1;
match store.read(oid, offset, length) {
Ok(data) => {
self.stats.bytes_read += data.len() as u64;
OsdResult::Success(Some(data))
}
Err(e) => OsdResult::Error(e),
}
}
OsdOp::Write { oid, offset, data } => {
self.stats.ops_write += 1;
self.stats.bytes_written += data.len() as u64;
let existing = store.read_full(oid).unwrap_or_default();
let mut new_data = existing;
let end = offset as usize + data.len();
if new_data.len() < end {
new_data.resize(end, 0);
}
new_data[offset as usize..end].copy_from_slice(&data);
match store.write(oid, new_data) {
Ok(_) => OsdResult::Success(None),
Err(e) => OsdResult::Error(e),
}
}
OsdOp::WriteFull { oid, data } => {
self.stats.ops_write += 1;
self.stats.bytes_written += data.len() as u64;
match store.write(oid, data) {
Ok(_) => OsdResult::Success(None),
Err(e) => OsdResult::Error(e),
}
}
OsdOp::Delete { oid } => match store.delete(oid) {
Ok(_) => OsdResult::Success(None),
Err(e) => OsdResult::Error(e),
},
OsdOp::Truncate { oid, size } => match store.read_full(oid) {
Ok(mut data) => {
data.truncate(size as usize);
match store.write(oid, data) {
Ok(_) => OsdResult::Success(None),
Err(e) => OsdResult::Error(e),
}
}
Err(e) => OsdResult::Error(e),
},
OsdOp::Stat { oid } => match store.stat(oid) {
Ok(stat) => OsdResult::Stat(stat),
Err(e) => OsdResult::Error(e),
},
OsdOp::SetXattr { oid, name, value } => match store.set_xattr(oid, &name, value) {
Ok(_) => OsdResult::Success(None),
Err(e) => OsdResult::Error(e),
},
OsdOp::GetXattr { oid, name } => match store.get_xattr(oid, &name) {
Ok(value) => OsdResult::Xattr(value),
Err(e) => OsdResult::Error(e),
},
OsdOp::RemoveXattr { oid, name } => match store.remove_xattr(oid, &name) {
Ok(_) => OsdResult::Success(None),
Err(e) => OsdResult::Error(e),
},
OsdOp::List { max, marker } => {
let objects = store.list(max, marker);
OsdResult::List(objects)
}
OsdOp::Replicate { oid, target_osd: _ } => {
self.stats.replication_sent += 1;
match store.read_full(oid) {
Ok(_data) => {
OsdResult::Success(None)
}
Err(e) => OsdResult::Error(e),
}
}
OsdOp::Pull {
oid: _,
source_osd: _,
} => {
self.stats.recovery_ops += 1;
OsdResult::Success(None)
}
OsdOp::Scrub { oid } => {
self.stats.scrub_ops += 1;
let corrupt = if let Some(oid) = oid {
match store.objects.get(&oid) {
Some(obj) if !obj.verify() => vec![oid],
_ => vec![],
}
} else {
store.verify_all()
};
if corrupt.is_empty() {
OsdResult::Success(None)
} else {
let oid = corrupt[0];
let obj = store.objects.get(&oid).unwrap();
OsdResult::Error(OsdError::ChecksumMismatch {
oid,
expected: obj.checksum,
got: LocalObject::compute_checksum(&obj.data),
})
}
}
}
}
pub fn primary_write(
&mut self,
pgid: u64,
oid: u64,
data: Vec<u8>,
request_id: u64,
) -> Result<(), OsdError> {
let pg = self.pgs.get(&pgid).ok_or(OsdError::PgNotFound(pgid))?;
if !pg.is_primary(self.id) {
return Err(OsdError::NotPrimary(pgid));
}
let store = self
.stores
.get_mut(&pgid)
.ok_or(OsdError::PgNotFound(pgid))?;
store.write(oid, data.clone())?;
let replicas = pg.replicas();
if replicas.is_empty() {
return Ok(());
}
let pending = PendingReplication {
request_id,
pgid,
oid,
targets: replicas.clone(),
acks: vec![self.id], required: replicas.len().div_ceil(2) + 1, timestamp: 0, };
self.pending_acks.insert(request_id, pending);
self.stats.replication_sent += replicas.len() as u64;
Ok(())
}
pub fn handle_ack(&mut self, request_id: u64, from_osd: u64) -> bool {
if let Some(pending) = self.pending_acks.get_mut(&request_id) {
if !pending.acks.contains(&from_osd) {
pending.acks.push(from_osd);
}
if pending.acks.len() >= pending.required {
self.pending_acks.remove(&request_id);
return true; }
}
false
}
pub fn pg_stats(&self, pgid: u64) -> Option<(u64, u64)> {
self.stores
.get(&pgid)
.map(|s| (s.object_count(), s.total_bytes()))
}
pub fn pg_ids(&self) -> Vec<u64> {
self.pgs.keys().copied().collect()
}
pub fn set_pg_state(&mut self, pgid: u64, state: PgState) {
if let Some(pg) = self.pgs.get_mut(&pgid) {
pg.state = state;
}
}
}
pub trait OsdNetwork: Send + Sync {
fn send(&self, target_osd: u64, message: &[u8]) -> Result<Vec<u8>, OsdError>;
fn broadcast(&self, message: &[u8]) -> Vec<(u64, Result<Vec<u8>, OsdError>)>;
fn listen(&self) -> Result<(), OsdError>;
}
#[derive(Debug, Default)]
pub struct NoOpOsdNetwork;
impl OsdNetwork for NoOpOsdNetwork {
fn send(&self, _target: u64, _message: &[u8]) -> Result<Vec<u8>, OsdError> {
Ok(Vec::new())
}
fn broadcast(&self, _message: &[u8]) -> Vec<(u64, Result<Vec<u8>, OsdError>)> {
Vec::new()
}
fn listen(&self) -> Result<(), OsdError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_osd() -> Osd {
let config = OsdConfig {
id: 0,
..Default::default()
};
let mut osd = Osd::new(config);
osd.boot().unwrap();
osd
}
#[test]
fn test_osd_creation() {
let osd = create_test_osd();
assert_eq!(osd.id(), 0);
assert_eq!(osd.state(), OsdState::Active);
}
#[test]
fn test_add_pg() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![1, 2]);
osd.add_pg(pg);
assert!(osd.get_pg(1).is_some());
assert!(osd.is_primary.load(Ordering::SeqCst));
}
#[test]
fn test_write_read() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: b"Hello, LCPFS!".to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(write_msg);
assert!(matches!(resp.result, OsdResult::Success(_)));
let read_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::Read {
oid: 100,
offset: 0,
length: 100,
},
request_id: 2,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(read_msg);
match resp.result {
OsdResult::Success(Some(data)) => {
assert_eq!(data, b"Hello, LCPFS!");
}
_ => panic!("Expected success with data"),
}
}
#[test]
fn test_partial_read() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: b"Hello, World!".to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(write_msg);
let read_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::Read {
oid: 100,
offset: 7,
length: 5,
},
request_id: 2,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(read_msg);
match resp.result {
OsdResult::Success(Some(data)) => {
assert_eq!(data, b"World");
}
_ => panic!("Expected success with data"),
}
}
#[test]
fn test_delete() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: b"test".to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(write_msg);
let delete_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::Delete { oid: 100 },
request_id: 2,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(delete_msg);
assert!(matches!(resp.result, OsdResult::Success(_)));
let read_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::Read {
oid: 100,
offset: 0,
length: 100,
},
request_id: 3,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(read_msg);
assert!(matches!(
resp.result,
OsdResult::Error(OsdError::ObjectNotFound(100))
));
}
#[test]
fn test_stat() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let data = b"test data for stat";
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: data.to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(write_msg);
let stat_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::Stat { oid: 100 },
request_id: 2,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(stat_msg);
match resp.result {
OsdResult::Stat(stat) => {
assert_eq!(stat.oid, 100);
assert_eq!(stat.size, data.len() as u64);
assert_eq!(stat.version, 1);
}
_ => panic!("Expected stat result"),
}
}
#[test]
fn test_xattrs() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: b"data".to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(write_msg);
let set_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::SetXattr {
oid: 100,
name: "user.test".to_string(),
value: b"xattr value".to_vec(),
},
request_id: 2,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(set_msg);
let get_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::GetXattr {
oid: 100,
name: "user.test".to_string(),
},
request_id: 3,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(get_msg);
match resp.result {
OsdResult::Xattr(value) => {
assert_eq!(value, b"xattr value");
}
_ => panic!("Expected xattr result"),
}
}
#[test]
fn test_list() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
for i in 0..5 {
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100 + i,
data: vec![i as u8; 100],
},
request_id: i,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(write_msg);
}
let list_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::List {
max: 10,
marker: None,
},
request_id: 100,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(list_msg);
match resp.result {
OsdResult::List(objects) => {
assert_eq!(objects.len(), 5);
}
_ => panic!("Expected list result"),
}
}
#[test]
fn test_pg_state_check() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Peering);
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: b"test".to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(write_msg);
assert!(matches!(
resp.result,
OsdResult::Error(OsdError::PgNotActive(1, PgState::Peering))
));
}
#[test]
fn test_stale_epoch() {
let mut osd = create_test_osd();
osd.set_epoch(10);
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let msg = OsdMessage {
epoch: 5, pgid: 1,
op: OsdOp::Stat { oid: 100 },
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(msg);
match resp.result {
OsdResult::Error(OsdError::StaleEpoch {
expected: 10,
got: 5,
}) => {}
_ => panic!("Expected stale epoch error"),
}
}
#[test]
fn test_primary_write() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![1, 2]); osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let result = osd.primary_write(1, 100, b"replicated data".to_vec(), 1);
assert!(result.is_ok());
let (count, _) = osd.pg_stats(1).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_local_store() {
let mut store = LocalStore::new();
store.write(1, b"hello".to_vec()).unwrap();
store.write(2, b"world".to_vec()).unwrap();
assert_eq!(store.object_count(), 2);
assert_eq!(store.total_bytes(), 10);
let data = store.read_full(1).unwrap();
assert_eq!(data, b"hello");
store.delete(1).unwrap();
assert_eq!(store.object_count(), 1);
assert!(store.verify_all().is_empty());
}
#[test]
fn test_pg_state_transitions() {
assert!(PgState::Active.can_read());
assert!(PgState::Active.can_write());
assert!(PgState::Active.is_healthy());
assert!(PgState::Degraded.can_read());
assert!(PgState::Degraded.can_write());
assert!(!PgState::Degraded.is_healthy());
assert!(!PgState::Peering.can_read());
assert!(!PgState::Peering.can_write());
assert!(PgState::Recovering.can_read());
assert!(PgState::Recovering.can_write());
}
#[test]
fn test_object_checksum() {
let obj = LocalObject::new(1, b"test data".to_vec());
assert!(obj.verify());
let mut obj2 = obj.clone();
obj2.data[0] = 0xFF;
assert!(!obj2.verify());
}
#[test]
fn test_scrub() {
let mut osd = create_test_osd();
let pg = PlacementGroup::new(1, 0, 0, vec![]);
osd.add_pg(pg);
osd.set_pg_state(1, PgState::Active);
let write_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::WriteFull {
oid: 100,
data: b"scrub test".to_vec(),
},
request_id: 1,
source_osd: None,
flags: OsdOpFlags::default(),
};
osd.handle_op(write_msg);
let scrub_msg = OsdMessage {
epoch: 0,
pgid: 1,
op: OsdOp::Scrub { oid: Some(100) },
request_id: 2,
source_osd: None,
flags: OsdOpFlags::default(),
};
let resp = osd.handle_op(scrub_msg);
assert!(matches!(resp.result, OsdResult::Success(_)));
}
}