use std::time::Duration;
use d_engine_core::ClientApiError;
use d_engine_proto::client::ReadConsistencyPolicy;
use tracing_test::traced_test;
use crate::client_manager::ClientManager;
use crate::common::TestContext;
use crate::common::WAIT_FOR_NODE_READY_IN_SEC;
use crate::common::check_cluster_is_ready;
use crate::common::create_bootstrap_urls;
use crate::common::create_node_config;
use crate::common::get_available_ports;
use crate::common::node_config;
use crate::common::reset;
use crate::common::start_node;
use crate::common::test_put_get;
#[tokio::test]
#[traced_test]
async fn test_readonly_mode_learner_standalone() -> Result<(), ClientApiError> {
const TEST_DIR: &str = "cluster_start_stop/analytics_rpc";
const DB_ROOT_DIR: &str = "./db/cluster_start_stop/analytics_rpc";
const LOG_DIR: &str = "./logs/cluster_start_stop/analytics_rpc";
reset(TEST_DIR).await?;
let mut port_guard = get_available_ports(4).await;
port_guard.release_listeners();
let ports = port_guard.as_slice();
println!("\n╔════════════════════════════════════════════════════════════╗");
println!("║ Analytics Node Test - Standalone/RPC Mode ║");
println!("╚════════════════════════════════════════════════════════════╝\n");
println!("[Phase 1] Starting 3-node primary cluster...");
let mut ctx = TestContext {
graceful_txs: Vec::new(),
node_handles: Vec::new(),
};
for (i, port) in ports[..3].iter().enumerate() {
let (graceful_tx, node_handle) = start_node(
node_config(
&create_node_config((i + 1) as u64, *port, &ports[..3], DB_ROOT_DIR, LOG_DIR).await,
),
None,
None,
)
.await?;
ctx.graceful_txs.push(graceful_tx);
ctx.node_handles.push(node_handle);
}
tokio::time::sleep(Duration::from_secs(WAIT_FOR_NODE_READY_IN_SEC)).await;
for port in &ports[..3] {
check_cluster_is_ready(&format!("127.0.0.1:{port}"), 10).await?;
}
println!("[Phase 1] ✓ 3-node primary cluster ready with 3 voters\n");
println!("[Phase 2] Writing test data to primary cluster...");
let bootstrap_urls = create_bootstrap_urls(&ports[..3]);
let mut client_manager = ClientManager::new(&bootstrap_urls).await?;
test_put_get(&mut client_manager, 1, 100).await?;
test_put_get(&mut client_manager, 2, 101).await?;
println!("[Phase 2] ✓ Test data written via RPC (key1=100, key2=101)\n");
println!("[Phase 3] Starting node 4 as standalone ReadOnly Learner...");
let node4_config = format!(
r#"
[cluster]
node_id = 4
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 4, name = 'n4', address = '127.0.0.1:{}', role = 4, status = 2 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[3], ports[0], ports[1], ports[2], ports[3], DB_ROOT_DIR, LOG_DIR
);
let (graceful_tx4, node_n4) = start_node(node_config(&node4_config), None, None).await?;
ctx.graceful_txs.push(graceful_tx4);
ctx.node_handles.push(node_n4);
tokio::time::sleep(Duration::from_secs(WAIT_FOR_NODE_READY_IN_SEC / 2)).await;
println!(" ✓ Node 4 process started");
println!("\n[DEBUG] Refreshing client manager to fetch latest cluster state...");
client_manager.refresh(None).await?;
println!("[DEBUG] Checking cluster membership configuration...");
let members = client_manager.list_members().await?;
println!("[DEBUG] Total members in cluster: {}", members.len());
for member in &members {
println!(
"[DEBUG] Node {}: role={}, status={}, address={}",
member.id, member.role, member.status, member.address
);
}
let leader_id = client_manager.list_leader_id().await?;
println!("[DEBUG] Current leader ID: {leader_id:?}\n");
println!("[Phase 3] ✓ Node 4 joined cluster as READ_ONLY Learner\n");
println!("[Phase 4] Verifying node 4 read-only capability...");
let node4_endpoint = format!("http://127.0.0.1:{}", ports[3]);
let value1 = ClientManager::read_from_node(
&node4_endpoint,
1,
ReadConsistencyPolicy::EventualConsistency,
)
.await?;
assert_eq!(value1, 100, "Node 4 should have key1=100");
println!(" ✓ Node 4 successfully read key1=100 (historical data synced)");
let value2 = ClientManager::read_from_node(
&node4_endpoint,
2,
ReadConsistencyPolicy::EventualConsistency,
)
.await?;
assert_eq!(value2, 101, "Node 4 should have key2=101");
println!(" ✓ Node 4 successfully read key2=101 (historical data synced)");
println!("[Phase 4] ✓ Analytics node synced all historical data\n");
println!("[Phase 5] Verifying cluster quorum remains unchanged...");
test_put_get(&mut client_manager, 3, 102).await?;
println!(" ✓ New write succeeded through primary cluster");
let value3 = ClientManager::read_from_node(
&node4_endpoint,
3,
ReadConsistencyPolicy::EventualConsistency,
)
.await?;
assert_eq!(
value3, 102,
"Node 4 should have received new write key3=102"
);
println!(" ✓ ReadOnly Learner received new write (passive replication works)");
println!("[Phase 5] ✓ Quorum unaffected, ReadOnly Learner receives updates\n");
println!("[Phase 6] Verifying node 4 status remains READ_ONLY...");
let members = client_manager.list_members().await?;
let node4_meta_current = members.iter().find(|m| m.id == 4).expect("Node 4 should exist");
assert_eq!(
node4_meta_current.status,
d_engine_proto::common::NodeStatus::ReadOnly as i32,
"Node 4 should remain READ_ONLY forever"
);
assert_eq!(
node4_meta_current.role,
d_engine_proto::common::NodeRole::Learner as i32, "Node 4 should remain LEARNER role"
);
println!(" ✓ Node 4 status verified as READ_ONLY (not promoted to Voter)");
println!("[Phase 6] ✓ ReadOnly Learner protection confirmed\n");
println!("╔════════════════════════════════════════════════════════════╗");
println!("║ TEST RESULTS: READONLY LEARNER (RPC STANDALONE) ║");
println!("╠════════════════════════════════════════════════════════════╣");
println!("║ ✓ JoinCluster RPC successfully adds nodes dynamically ║");
println!("║ ✓ Nodes can be added with LEARNER role (role=4) ║");
println!("║ ✓ READ_ONLY status (status=2) prevents auto-promotion ║");
println!("║ ✓ ReadOnly Learners receive all replicated data ║");
println!("║ ✓ Quorum calculations exclude ReadOnly Learners ║");
println!("║ ║");
println!("║ CONCLUSION: ReadOnly Learners are FULLY SUPPORTED ✅ ║");
println!("║ Permanent Analytics nodes work correctly ║");
println!("║ in Standalone/RPC mode ║");
println!("╚════════════════════════════════════════════════════════════╝\n");
ctx.shutdown().await
}