#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
use alloc::collections::BTreeMap;
use alloc::format;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use core::fmt;
#[cfg(feature = "std")]
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[cfg(not(feature = "std"))]
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use super::crush::{BucketType, CrushError, CrushMap};
use super::mds::{Mds, MdsConfig, MdsError};
use super::osd::{Osd, OsdConfig, OsdError, OsdState, PgState, PlacementGroup};
#[derive(Debug, Clone)]
pub struct ClusterConfig {
pub cluster_name: String,
pub cluster_uuid: [u8; 16],
pub replication_factor: usize,
pub pg_count: u64,
pub min_osds: usize,
pub failure_domain: BucketType,
pub heartbeat_interval_ms: u64,
pub heartbeat_timeout_ms: u64,
pub recovery_max_bandwidth: u64,
pub scrub_interval_secs: u64,
pub auto_recovery: bool,
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
cluster_name: "lcpfs".to_string(),
cluster_uuid: [0; 16],
replication_factor: 3,
pg_count: 256,
min_osds: 3,
failure_domain: BucketType::Host,
heartbeat_interval_ms: 1000,
heartbeat_timeout_ms: 20000,
recovery_max_bandwidth: 0,
scrub_interval_secs: 86400,
auto_recovery: true,
}
}
}
#[derive(Debug, Clone)]
pub struct OsdInfo {
pub id: u64,
pub addr: String,
pub state: OsdNodeState,
pub weight: f64,
pub failure_domain: String,
pub capacity: u64,
pub used: u64,
pub last_heartbeat: u64,
pub in_epoch: u64,
pub up_epoch: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OsdNodeState {
Up,
Down,
Joining,
Leaving,
Out,
Destroyed,
}
impl OsdNodeState {
pub fn is_available(&self) -> bool {
matches!(self, OsdNodeState::Up)
}
}
#[derive(Debug, Clone)]
pub struct OsdMap {
pub epoch: u64,
pub osds: BTreeMap<u64, OsdInfo>,
pub up_osds: Vec<u64>,
pub in_osds: Vec<u64>,
pub pools: BTreeMap<u64, PoolConfig>,
pub pg_map: BTreeMap<u64, PgMapping>,
pub flags: ClusterFlags,
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub id: u64,
pub name: String,
pub size: usize,
pub min_size: usize,
pub pg_count: u64,
pub crush_rule: String,
}
#[derive(Debug, Clone)]
pub struct PgMapping {
pub pgid: u64,
pub pool_id: u64,
pub acting: Vec<u64>,
pub up: Vec<u64>,
pub primary: u64,
pub state: PgState,
pub last_active_epoch: u64,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ClusterFlags {
pub pauserd: bool,
pub pausewr: bool,
pub nodown: bool,
pub noout: bool,
pub norecover: bool,
pub nobackfill: bool,
pub noscrub: bool,
pub nodeep_scrub: bool,
}
impl OsdMap {
pub fn new() -> Self {
Self {
epoch: 0,
osds: BTreeMap::new(),
up_osds: Vec::new(),
in_osds: Vec::new(),
pools: BTreeMap::new(),
pg_map: BTreeMap::new(),
flags: ClusterFlags::default(),
}
}
pub fn add_osd(&mut self, osd: OsdInfo) {
let id = osd.id;
if osd.state.is_available() && !self.up_osds.contains(&id) {
self.up_osds.push(id);
}
if !self.in_osds.contains(&id) {
self.in_osds.push(id);
}
self.osds.insert(id, osd);
self.epoch += 1;
}
pub fn mark_down(&mut self, osd_id: u64) {
if let Some(osd) = self.osds.get_mut(&osd_id) {
osd.state = OsdNodeState::Down;
self.up_osds.retain(|&id| id != osd_id);
self.epoch += 1;
}
}
pub fn mark_up(&mut self, osd_id: u64) {
if let Some(osd) = self.osds.get_mut(&osd_id) {
osd.state = OsdNodeState::Up;
osd.up_epoch = self.epoch + 1;
if !self.up_osds.contains(&osd_id) {
self.up_osds.push(osd_id);
}
self.epoch += 1;
}
}
pub fn mark_out(&mut self, osd_id: u64) {
if let Some(osd) = self.osds.get_mut(&osd_id) {
osd.state = OsdNodeState::Out;
self.in_osds.retain(|&id| id != osd_id);
self.epoch += 1;
}
}
pub fn get_osd(&self, osd_id: u64) -> Option<&OsdInfo> {
self.osds.get(&osd_id)
}
pub fn up_count(&self) -> usize {
self.up_osds.len()
}
pub fn in_count(&self) -> usize {
self.in_osds.len()
}
pub fn create_pool(&mut self, name: &str, pg_count: u64, size: usize) -> u64 {
let id = self.pools.len() as u64;
let pool = PoolConfig {
id,
name: name.to_string(),
size,
min_size: size.div_ceil(2), pg_count,
crush_rule: "replicated_rule".to_string(),
};
self.pools.insert(id, pool);
self.epoch += 1;
id
}
}
impl Default for OsdMap {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct FailureDetector {
interval_ms: u64,
timeout_ms: u64,
last_seen: BTreeMap<u64, u64>,
failed: Vec<u64>,
}
impl FailureDetector {
pub fn new(interval_ms: u64, timeout_ms: u64) -> Self {
Self {
interval_ms,
timeout_ms,
last_seen: BTreeMap::new(),
failed: Vec::new(),
}
}
pub fn heartbeat(&mut self, osd_id: u64, timestamp: u64) {
self.last_seen.insert(osd_id, timestamp);
self.failed.retain(|&id| id != osd_id);
}
pub fn check(&mut self, current_time: u64) -> Vec<u64> {
let mut newly_failed = Vec::new();
for (&osd_id, &last_time) in &self.last_seen {
if current_time > last_time + self.timeout_ms && !self.failed.contains(&osd_id) {
newly_failed.push(osd_id);
}
}
for osd_id in &newly_failed {
if !self.failed.contains(osd_id) {
self.failed.push(*osd_id);
}
}
newly_failed
}
pub fn get_failed(&self) -> &[u64] {
&self.failed
}
pub fn clear_failure(&mut self, osd_id: u64) {
self.failed.retain(|&id| id != osd_id);
}
pub fn remove(&mut self, osd_id: u64) {
self.last_seen.remove(&osd_id);
self.failed.retain(|&id| id != osd_id);
}
}
#[derive(Debug, Clone)]
pub struct RecoveryState {
pub pgid: u64,
pub source_osd: u64,
pub target_osd: u64,
pub objects_total: u64,
pub objects_done: u64,
pub bytes_total: u64,
pub bytes_done: u64,
pub started: u64,
pub state: RecoveryOpState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryOpState {
Queued,
InProgress,
Completed,
Failed,
Cancelled,
}
#[derive(Debug)]
pub struct RecoveryManager {
max_concurrent: usize,
active: BTreeMap<u64, RecoveryState>,
queue: Vec<RecoveryState>,
completed: u64,
failed: u64,
}
impl RecoveryManager {
pub fn new(max_concurrent: usize) -> Self {
Self {
max_concurrent,
active: BTreeMap::new(),
queue: Vec::new(),
completed: 0,
failed: 0,
}
}
pub fn queue_recovery(&mut self, pgid: u64, source_osd: u64, target_osd: u64) {
if self.active.contains_key(&pgid) {
return;
}
if self.queue.iter().any(|r| r.pgid == pgid) {
return;
}
let recovery = RecoveryState {
pgid,
source_osd,
target_osd,
objects_total: 0,
objects_done: 0,
bytes_total: 0,
bytes_done: 0,
started: 0,
state: RecoveryOpState::Queued,
};
self.queue.push(recovery);
}
pub fn start_recoveries(&mut self, current_time: u64) -> Vec<u64> {
let mut started = Vec::new();
while self.active.len() < self.max_concurrent && !self.queue.is_empty() {
let mut recovery = self.queue.remove(0);
recovery.state = RecoveryOpState::InProgress;
recovery.started = current_time;
started.push(recovery.pgid);
self.active.insert(recovery.pgid, recovery);
}
started
}
pub fn complete(&mut self, pgid: u64) {
if let Some(mut recovery) = self.active.remove(&pgid) {
recovery.state = RecoveryOpState::Completed;
self.completed += 1;
}
}
pub fn fail(&mut self, pgid: u64) {
if let Some(mut recovery) = self.active.remove(&pgid) {
recovery.state = RecoveryOpState::Failed;
self.failed += 1;
recovery.state = RecoveryOpState::Queued;
self.queue.push(recovery);
}
}
pub fn update_progress(&mut self, pgid: u64, objects_done: u64, bytes_done: u64) {
if let Some(recovery) = self.active.get_mut(&pgid) {
recovery.objects_done = objects_done;
recovery.bytes_done = bytes_done;
}
}
pub fn active_count(&self) -> usize {
self.active.len()
}
pub fn queued_count(&self) -> usize {
self.queue.len()
}
pub fn stats(&self) -> (u64, u64, usize, usize) {
(
self.completed,
self.failed,
self.active.len(),
self.queue.len(),
)
}
}
pub struct ClusterClient {
config: ClusterConfig,
crush_map: CrushMap,
osd_map: OsdMap,
request_id: AtomicU64,
}
impl ClusterClient {
pub fn new(config: ClusterConfig, crush_map: CrushMap, osd_map: OsdMap) -> Self {
Self {
config,
crush_map,
osd_map,
request_id: AtomicU64::new(1),
}
}
fn next_request_id(&self) -> u64 {
self.request_id.fetch_add(1, Ordering::SeqCst)
}
pub fn object_to_pg(&self, pool_id: u64, oid: u64) -> u64 {
let pool = match self.osd_map.pools.get(&pool_id) {
Some(p) => p,
None => return 0,
};
let hash = oid.wrapping_mul(0x9e3779b97f4a7c15);
hash % pool.pg_count
}
pub fn get_pg_osds(&self, pool_id: u64, pgid: u64) -> Result<Vec<u64>, ClusterError> {
let pool = self
.osd_map
.pools
.get(&pool_id)
.ok_or(ClusterError::PoolNotFound(pool_id))?;
self.crush_map
.select(&pool.crush_rule, pgid, pool.size)
.map_err(ClusterError::CrushError)
}
pub fn get_primary(&self, pool_id: u64, pgid: u64) -> Result<u64, ClusterError> {
let osds = self.get_pg_osds(pool_id, pgid)?;
osds.first()
.copied()
.ok_or(ClusterError::NoPrimaryAvailable)
}
pub fn epoch(&self) -> u64 {
self.osd_map.epoch
}
pub fn is_healthy(&self) -> bool {
self.osd_map.up_count() >= self.config.min_osds
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClusterState {
Initializing,
Active,
Degraded,
Recovering,
Offline,
}
pub struct ClusterManager {
config: ClusterConfig,
state: ClusterState,
crush_map: CrushMap,
osd_map: OsdMap,
failure_detector: FailureDetector,
recovery_manager: RecoveryManager,
local_osds: BTreeMap<u64, Osd>,
local_mds: Option<Mds>,
epoch: AtomicU64,
running: AtomicBool,
}
impl ClusterManager {
pub fn new(config: ClusterConfig) -> Self {
let crush_map = CrushMap::new();
let osd_map = OsdMap::new();
let failure_detector =
FailureDetector::new(config.heartbeat_interval_ms, config.heartbeat_timeout_ms);
let recovery_manager = RecoveryManager::new(4);
Self {
config,
state: ClusterState::Initializing,
crush_map,
osd_map,
failure_detector,
recovery_manager,
local_osds: BTreeMap::new(),
local_mds: None,
epoch: AtomicU64::new(0),
running: AtomicBool::new(false),
}
}
pub fn init_cluster(&mut self) -> Result<(), ClusterError> {
self.osd_map.create_pool(
"default",
self.config.pg_count,
self.config.replication_factor,
);
self.crush_map = CrushMap::simple(0);
let mds_config = MdsConfig {
pg_count: self.config.pg_count,
..Default::default()
};
self.local_mds = Some(Mds::new(mds_config));
self.state = ClusterState::Initializing;
self.running.store(true, Ordering::SeqCst);
Ok(())
}
pub fn add_osd(&mut self, osd_id: u64, addr: &str, weight: f64) -> Result<(), ClusterError> {
let osd_info = OsdInfo {
id: osd_id,
addr: addr.to_string(),
state: OsdNodeState::Up,
weight,
failure_domain: format!("host-{}", osd_id),
capacity: 1024 * 1024 * 1024 * 100, used: 0,
last_heartbeat: 0,
in_epoch: self.osd_map.epoch,
up_epoch: self.osd_map.epoch,
};
self.osd_map.add_osd(osd_info.clone());
self.crush_map
.add_osd(osd_id, &osd_info.failure_domain, weight);
let osd_config = OsdConfig {
id: osd_id,
cluster_name: self.config.cluster_name.clone(),
..Default::default()
};
let mut osd = Osd::new(osd_config);
osd.boot().map_err(ClusterError::OsdError)?;
self.local_osds.insert(osd_id, osd);
self.epoch.fetch_add(1, Ordering::SeqCst);
if self.osd_map.up_count() >= self.config.min_osds {
self.state = ClusterState::Active;
self.assign_pgs()?;
}
Ok(())
}
pub fn remove_osd(&mut self, osd_id: u64, force: bool) -> Result<(), ClusterError> {
if !force && self.osd_map.up_count() <= self.config.min_osds {
return Err(ClusterError::InsufficientOsds);
}
self.osd_map.mark_out(osd_id);
self.failure_detector.remove(osd_id);
if let Some(mut osd) = self.local_osds.remove(&osd_id) {
osd.shutdown();
}
self.assign_pgs()?;
self.epoch.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn assign_pgs(&mut self) -> Result<(), ClusterError> {
for (pool_id, pool) in &self.osd_map.pools.clone() {
for pg_num in 0..pool.pg_count {
let pgid = (*pool_id << 32) | pg_num;
let osds = self
.crush_map
.select(&pool.crush_rule, pgid, pool.size)
.map_err(ClusterError::CrushError)?;
if osds.is_empty() {
continue;
}
let primary = osds[0];
let replicas: Vec<u64> = osds.iter().skip(1).copied().collect();
let mapping = PgMapping {
pgid,
pool_id: *pool_id,
acting: osds.clone(),
up: osds.clone(),
primary,
state: PgState::Active,
last_active_epoch: self.epoch.load(Ordering::SeqCst),
};
self.osd_map.pg_map.insert(pgid, mapping);
for &osd_id in &osds {
if let Some(osd) = self.local_osds.get_mut(&osd_id) {
let pg = PlacementGroup::new(pgid, *pool_id, primary, replicas.clone());
osd.add_pg(pg);
osd.set_pg_state(pgid, PgState::Active);
}
}
}
}
Ok(())
}
pub fn heartbeat(&mut self, osd_id: u64, timestamp: u64) {
self.failure_detector.heartbeat(osd_id, timestamp);
if let Some(osd_info) = self.osd_map.osds.get_mut(&osd_id) {
osd_info.last_heartbeat = timestamp;
if osd_info.state == OsdNodeState::Down {
osd_info.state = OsdNodeState::Up;
if !self.osd_map.up_osds.contains(&osd_id) {
self.osd_map.up_osds.push(osd_id);
}
}
}
}
pub fn check_health(&mut self, current_time: u64) -> Vec<ClusterEvent> {
let mut events = Vec::new();
let failed = self.failure_detector.check(current_time);
for osd_id in failed {
if !self.osd_map.flags.nodown {
self.osd_map.mark_down(osd_id);
events.push(ClusterEvent::OsdDown(osd_id));
for (pgid, mapping) in &mut self.osd_map.pg_map {
if mapping.acting.contains(&osd_id) {
mapping.state = PgState::Degraded;
events.push(ClusterEvent::PgDegraded(*pgid));
if self.config.auto_recovery && !self.osd_map.flags.norecover {
if let Some(&source) = mapping.acting.iter().find(|&&id| id != osd_id) {
if let Ok(new_osds) =
self.crush_map
.select(&mapping.pool_id.to_string(), *pgid, 1)
{
if let Some(&target) = new_osds.first() {
self.recovery_manager.queue_recovery(*pgid, source, target);
}
}
}
}
}
}
}
}
let started = self.recovery_manager.start_recoveries(current_time);
for pgid in started {
events.push(ClusterEvent::RecoveryStarted(pgid));
}
self.update_state();
events
}
fn update_state(&mut self) {
let up_count = self.osd_map.up_count();
if up_count < self.config.min_osds {
self.state = ClusterState::Offline;
} else if self.recovery_manager.active_count() > 0 {
self.state = ClusterState::Recovering;
} else if self
.osd_map
.pg_map
.values()
.any(|pg| pg.state == PgState::Degraded)
{
self.state = ClusterState::Degraded;
} else {
self.state = ClusterState::Active;
}
}
pub fn state(&self) -> ClusterState {
self.state
}
pub fn epoch(&self) -> u64 {
self.epoch.load(Ordering::SeqCst)
}
pub fn crush_map(&self) -> &CrushMap {
&self.crush_map
}
pub fn osd_map(&self) -> &OsdMap {
&self.osd_map
}
pub fn stats(&self) -> ClusterStats {
let mut total_capacity = 0u64;
let mut total_used = 0u64;
for osd in self.osd_map.osds.values() {
total_capacity += osd.capacity;
total_used += osd.used;
}
let (recovery_completed, recovery_failed, recovery_active, recovery_queued) =
self.recovery_manager.stats();
ClusterStats {
state: self.state,
epoch: self.epoch.load(Ordering::SeqCst),
osds_total: self.osd_map.osds.len(),
osds_up: self.osd_map.up_count(),
osds_in: self.osd_map.in_count(),
pgs_total: self.osd_map.pg_map.len(),
pgs_active: self
.osd_map
.pg_map
.values()
.filter(|pg| pg.state == PgState::Active)
.count(),
pgs_degraded: self
.osd_map
.pg_map
.values()
.filter(|pg| pg.state == PgState::Degraded)
.count(),
total_capacity,
total_used,
recovery_active,
recovery_queued,
recovery_completed,
recovery_failed,
}
}
pub fn client(&self) -> ClusterClient {
ClusterClient::new(
self.config.clone(),
self.crush_map.clone(),
self.osd_map.clone(),
)
}
}
#[derive(Debug, Clone)]
pub struct ClusterStats {
pub state: ClusterState,
pub epoch: u64,
pub osds_total: usize,
pub osds_up: usize,
pub osds_in: usize,
pub pgs_total: usize,
pub pgs_active: usize,
pub pgs_degraded: usize,
pub total_capacity: u64,
pub total_used: u64,
pub recovery_active: usize,
pub recovery_queued: usize,
pub recovery_completed: u64,
pub recovery_failed: u64,
}
#[derive(Debug, Clone)]
pub enum ClusterEvent {
OsdDown(u64),
OsdUp(u64),
OsdAdded(u64),
OsdRemoved(u64),
PgDegraded(u64),
PgActive(u64),
RecoveryStarted(u64),
RecoveryCompleted(u64),
EpochChanged(u64),
}
#[derive(Debug, Clone)]
pub enum ClusterError {
InsufficientOsds,
PoolNotFound(u64),
OsdNotFound(u64),
PgNotFound(u64),
NoPrimaryAvailable,
CrushError(CrushError),
OsdError(OsdError),
MdsError(MdsError),
NotReady,
NotAllowed(String),
}
impl fmt::Display for ClusterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ClusterError::InsufficientOsds => write!(f, "Insufficient OSDs"),
ClusterError::PoolNotFound(id) => write!(f, "Pool {} not found", id),
ClusterError::OsdNotFound(id) => write!(f, "OSD {} not found", id),
ClusterError::PgNotFound(id) => write!(f, "PG {} not found", id),
ClusterError::NoPrimaryAvailable => write!(f, "No primary OSD available"),
ClusterError::CrushError(e) => write!(f, "CRUSH error: {}", e),
ClusterError::OsdError(e) => write!(f, "OSD error: {}", e),
ClusterError::MdsError(e) => write!(f, "MDS error: {}", e),
ClusterError::NotReady => write!(f, "Cluster not ready"),
ClusterError::NotAllowed(msg) => write!(f, "Not allowed: {}", msg),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_cluster() -> ClusterManager {
let config = ClusterConfig {
min_osds: 1,
replication_factor: 1,
pg_count: 16,
..Default::default()
};
ClusterManager::new(config)
}
#[test]
fn test_cluster_creation() {
let cluster = create_test_cluster();
assert_eq!(cluster.state(), ClusterState::Initializing);
}
#[test]
fn test_cluster_init() {
let mut cluster = create_test_cluster();
cluster.init_cluster().unwrap();
assert!(cluster.local_mds.is_some());
}
#[test]
fn test_add_osd() {
let mut cluster = create_test_cluster();
cluster.init_cluster().unwrap();
cluster.add_osd(0, "127.0.0.1:6800", 1.0).unwrap();
assert_eq!(cluster.osd_map.up_count(), 1);
assert_eq!(cluster.state(), ClusterState::Active);
}
#[test]
fn test_multiple_osds() {
let mut cluster = ClusterManager::new(ClusterConfig {
min_osds: 3,
replication_factor: 3,
pg_count: 32,
..Default::default()
});
cluster.init_cluster().unwrap();
cluster.add_osd(0, "node0:6800", 1.0).unwrap();
assert_eq!(cluster.state(), ClusterState::Initializing);
cluster.add_osd(1, "node1:6800", 1.0).unwrap();
assert_eq!(cluster.state(), ClusterState::Initializing);
cluster.add_osd(2, "node2:6800", 1.0).unwrap();
assert_eq!(cluster.state(), ClusterState::Active);
}
#[test]
fn test_osd_failure() {
let mut cluster = create_test_cluster();
cluster.init_cluster().unwrap();
cluster.add_osd(0, "node0:6800", 1.0).unwrap();
cluster.add_osd(1, "node1:6800", 1.0).unwrap();
cluster.heartbeat(0, 1000);
cluster.heartbeat(1, 1000);
cluster.heartbeat(1, 30000);
let events = cluster.check_health(30000);
assert!(events.iter().any(|e| matches!(e, ClusterEvent::OsdDown(0))));
}
#[test]
fn test_failure_detector() {
let mut fd = FailureDetector::new(1000, 5000);
fd.heartbeat(0, 1000);
fd.heartbeat(1, 1000);
assert!(fd.check(3000).is_empty());
let failed = fd.check(7000);
assert!(failed.contains(&0));
assert!(failed.contains(&1));
fd.heartbeat(0, 8000);
fd.clear_failure(0);
assert!(!fd.get_failed().contains(&0));
}
#[test]
fn test_recovery_manager() {
let mut rm = RecoveryManager::new(2);
rm.queue_recovery(1, 0, 2);
rm.queue_recovery(2, 0, 3);
rm.queue_recovery(3, 0, 4);
assert_eq!(rm.queued_count(), 3);
let started = rm.start_recoveries(1000);
assert_eq!(started.len(), 2);
assert_eq!(rm.active_count(), 2);
assert_eq!(rm.queued_count(), 1);
rm.complete(1);
assert_eq!(rm.active_count(), 1);
let started = rm.start_recoveries(2000);
assert_eq!(started.len(), 1);
}
#[test]
fn test_osd_map() {
let mut map = OsdMap::new();
let osd = OsdInfo {
id: 0,
addr: "localhost:6800".to_string(),
state: OsdNodeState::Up,
weight: 1.0,
failure_domain: "host-0".to_string(),
capacity: 1000000000,
used: 0,
last_heartbeat: 0,
in_epoch: 0,
up_epoch: 0,
};
map.add_osd(osd);
assert_eq!(map.up_count(), 1);
assert_eq!(map.in_count(), 1);
map.mark_down(0);
assert_eq!(map.up_count(), 0);
map.mark_up(0);
assert_eq!(map.up_count(), 1);
}
#[test]
fn test_cluster_client() {
let mut cluster = create_test_cluster();
cluster.init_cluster().unwrap();
cluster.add_osd(0, "node0:6800", 1.0).unwrap();
cluster.add_osd(1, "node1:6800", 1.0).unwrap();
let client = cluster.client();
let pg1 = client.object_to_pg(0, 1000);
let pg2 = client.object_to_pg(0, 1000);
assert_eq!(pg1, pg2);
let pg3 = client.object_to_pg(0, 2000);
let _ = pg3;
assert!(client.is_healthy());
}
#[test]
fn test_cluster_stats() {
let mut cluster = create_test_cluster();
cluster.init_cluster().unwrap();
cluster.add_osd(0, "node0:6800", 1.0).unwrap();
let stats = cluster.stats();
assert_eq!(stats.osds_total, 1);
assert_eq!(stats.osds_up, 1);
assert_eq!(stats.state, ClusterState::Active);
}
#[test]
fn test_pg_mapping() {
let mut cluster = ClusterManager::new(ClusterConfig {
min_osds: 2,
replication_factor: 2,
pg_count: 8,
..Default::default()
});
cluster.init_cluster().unwrap();
cluster.add_osd(0, "node0:6800", 1.0).unwrap();
cluster.add_osd(1, "node1:6800", 1.0).unwrap();
assert!(!cluster.osd_map.pg_map.is_empty());
for mapping in cluster.osd_map.pg_map.values() {
assert!(mapping.acting.len() <= 2);
}
}
#[test]
fn test_cluster_flags() {
let flags = ClusterFlags::default();
assert!(!flags.pauserd);
assert!(!flags.nodown);
assert!(!flags.norecover);
}
#[test]
fn test_pool_creation() {
let mut map = OsdMap::new();
let pool_id = map.create_pool("test", 64, 3);
assert_eq!(pool_id, 0);
let pool = map.pools.get(&pool_id).unwrap();
assert_eq!(pool.name, "test");
assert_eq!(pool.pg_count, 64);
assert_eq!(pool.size, 3);
assert_eq!(pool.min_size, 2); }
}