use super::*;
use crate::storage::HybridStorage;
use std::sync::Arc;
use tokio;
#[tokio::test]
async fn test_replication_coordinator_creation() {
let broker_id = 1;
let config = ReplicationConfig::default();
let coordinator = ReplicationCoordinator::new(broker_id, config);
assert!(!coordinator.is_leader("test-topic", 0).await);
assert_eq!(coordinator.get_leader("test-topic", 0).await, None);
}
#[tokio::test]
async fn test_become_leader() {
let broker_id = 1;
let config = ReplicationConfig::default();
let coordinator = ReplicationCoordinator::new(broker_id, config);
let replicas = vec![1, 2, 3];
coordinator
.become_leader("test-topic", 0, replicas)
.await
.unwrap();
assert!(coordinator.is_leader("test-topic", 0).await);
assert_eq!(
coordinator.get_leader("test-topic", 0).await,
Some(broker_id)
);
let state = coordinator
.get_partition_state("test-topic", 0)
.await
.unwrap();
assert_eq!(state.role, ReplicationRole::Leader);
assert_eq!(state.topic, "test-topic");
assert_eq!(state.partition, 0);
assert_eq!(state.replicas, vec![1, 2, 3]);
assert_eq!(state.in_sync_replicas, vec![broker_id]);
}
#[tokio::test]
async fn test_become_follower() {
let broker_id = 2;
let leader_id = 1;
let config = ReplicationConfig::default();
let coordinator = ReplicationCoordinator::new(broker_id, config);
coordinator
.become_follower("test-topic", 0, leader_id)
.await
.unwrap();
assert!(!coordinator.is_leader("test-topic", 0).await);
assert_eq!(
coordinator.get_leader("test-topic", 0).await,
Some(leader_id)
);
let state = coordinator
.get_partition_state("test-topic", 0)
.await
.unwrap();
assert_eq!(state.role, ReplicationRole::Follower { leader_id });
assert_eq!(state.topic, "test-topic");
assert_eq!(state.partition, 0);
assert_eq!(state.replicas, vec![leader_id, broker_id]);
assert_eq!(state.in_sync_replicas, vec![leader_id]);
}
#[tokio::test]
async fn test_leader_state_creation() {
let broker_id = 1;
let topic = "test-topic".to_string();
let partition = 0;
let followers = vec![2, 3];
let config = ReplicationConfig::default();
let leader_state = LeaderState::new(
broker_id,
topic.clone(),
partition,
followers.clone(),
config,
);
let isr = leader_state.get_isr().await;
assert!(isr.contains(&broker_id)); assert!(isr.len() >= 1); }
#[tokio::test]
async fn test_follower_state_creation() {
let broker_id = 2;
let leader_id = 1;
let topic = "test-topic".to_string();
let partition = 0;
let config = ReplicationConfig::default();
let follower_state = FollowerState::new(broker_id, topic.clone(), partition, leader_id, config);
assert_eq!(follower_state.get_current_term().await, 0);
assert_eq!(follower_state.get_commit_index().await, 0);
assert_eq!(follower_state.get_last_applied().await, 0);
}
#[tokio::test]
async fn test_replication_manager_creation() {
let storage = Arc::new(HybridStorage::new("./test-data-replication").unwrap());
let broker_id = 1;
let config = ReplicationConfig::default();
let manager = ReplicationManager::new(broker_id, storage, config);
let followers = vec![2, 3];
manager
.add_leader("test-topic", 0, followers)
.await
.unwrap();
}
#[tokio::test]
async fn test_follower_sync_creation() {
let storage = Arc::new(HybridStorage::new("./test-data-follower").unwrap());
let broker_id = 2;
let config = ReplicationConfig::default();
let sync = FollowerSync::new(broker_id, storage, config);
let leader_id = 1;
sync.add_follower("test-topic", 0, leader_id).await.unwrap();
let follower = sync.get_follower("test-topic", 0).await;
assert!(follower.is_some());
let _follower_state = follower.unwrap();
assert!(sync.is_caught_up().await);
let status = sync.get_follower_status().await;
assert!(status.contains_key(&("test-topic".to_string(), 0)));
}
#[tokio::test]
async fn test_replication_config_defaults() {
let config = ReplicationConfig::default();
assert_eq!(config.min_isr, 1);
assert_eq!(config.replication_factor, 3);
assert_eq!(config.heartbeat_interval_ms, 1000);
assert_eq!(config.replication_timeout_ms, 5000);
assert_eq!(config.max_batch_size, 1000);
}
#[tokio::test]
async fn test_log_entry_creation() {
use crate::protocol::Message;
use std::time::{SystemTime, UNIX_EPOCH};
let message = Message {
key: Some(bytes::Bytes::from("test-key")),
value: bytes::Bytes::from("test-value"),
headers: std::collections::HashMap::new(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
};
let entry = LogEntry {
offset: 42,
term: 1,
message: message.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
};
assert_eq!(entry.offset, 42);
assert_eq!(entry.term, 1);
assert_eq!(entry.message.value, message.value);
}