d-engine-server 0.2.3

Production-ready Raft consensus engine server and runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
use d_engine_server::{RocksDBStateMachine, RocksDBStorageEngine};
use std::sync::Arc;
use std::time::Duration;

use d_engine_server::EmbeddedEngine;
use tracing::info;
use tracing_test::traced_test;

use crate::common::get_available_ports;
use crate::common::node_config;

/// Test scaling from single-node to 3-node cluster
///
/// Scenario:
/// 1. Start single node, write data
/// 2. Start another two nodes and join node 1 as cluster. Node 1 should remain as Leader, no data lost
/// 3. Verify cluster healthy and data preserved
#[tokio::test]
#[traced_test]
async fn test_scale_single_to_cluster() -> Result<(), Box<dyn std::error::Error>> {
    let temp_dir = tempfile::tempdir()?;
    let db_root_dir = temp_dir.path().join("db");
    let log_dir = temp_dir.path().join("logs");

    let mut port_guard = get_available_ports(3).await;
    port_guard.release_listeners();
    let ports = port_guard.as_slice();

    // Phase 1: Start single-node cluster
    info!("Phase 1: Starting single-node mode");

    // Node 1 config: single-node cluster
    let node1_config = format!(
        r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }}
]
db_root_dir = '{}'
log_dir = '{}'

[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
        ports[0],
        ports[0],
        db_root_dir.display(),
        log_dir.display()
    );

    let node1_config_path = "/tmp/scale_test_node1.toml";
    tokio::fs::write(node1_config_path, &node1_config).await?;

    let config = node_config(&node1_config);
    let storage_path = config.cluster.db_root_dir.join("node1/storage");
    let sm_path = config.cluster.db_root_dir.join("node1/state_machine");

    tokio::fs::create_dir_all(&storage_path).await?;
    tokio::fs::create_dir_all(&sm_path).await?;

    let storage1 = Arc::new(RocksDBStorageEngine::new(storage_path)?);
    let sm1 = Arc::new(RocksDBStateMachine::new(sm_path)?);

    let engine1 = EmbeddedEngine::start_custom(storage1, sm1, Some(node1_config_path)).await?;

    let leader = engine1.wait_ready(Duration::from_secs(5)).await?;
    info!(
        "Node 1 became leader: {} (term {})",
        leader.leader_id, leader.term
    );
    assert_eq!(leader.leader_id, 1, "Node 1 should be leader");

    // Write data in single-node mode
    engine1.client().put(b"dev-key".to_vec(), b"dev-value".to_vec()).await?;
    tokio::time::sleep(Duration::from_millis(100)).await;

    let val = engine1.client().get_linearizable(b"dev-key".to_vec()).await?;
    assert_eq!(val.as_deref(), Some(b"dev-value".as_ref()));
    info!("Data written to single-node cluster");

    // Phase 2: Start node 2 and node 3 as Learners to join cluster
    info!("Phase 2: Starting node 2 and node 3 to join cluster");

    // Node 2 config: joining as Learner
    let node2_config = format!(
        r#"
[cluster]
node_id = 2
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
    {{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{}'
log_dir = '{}'

[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
        ports[1],
        ports[0],
        ports[1],
        db_root_dir.display(),
        log_dir.display()
    );

    let node2_config_path = "/tmp/scale_test_node2.toml";
    tokio::fs::write(node2_config_path, &node2_config).await?;

    let config2 = node_config(&node2_config);
    let storage_path2 = config2.cluster.db_root_dir.join("node2/storage");
    let sm_path2 = config2.cluster.db_root_dir.join("node2/state_machine");

    tokio::fs::create_dir_all(&storage_path2).await?;
    tokio::fs::create_dir_all(&sm_path2).await?;

    let storage2 = Arc::new(RocksDBStorageEngine::new(storage_path2)?);
    let sm2 = Arc::new(RocksDBStateMachine::new(sm_path2)?);

    let engine2 = EmbeddedEngine::start_custom(storage2, sm2, Some(node2_config_path)).await?;
    info!("Node 2 started, joining as Learner");

    // Wait a bit for node 2 to sync and get promoted
    tokio::time::sleep(Duration::from_secs(3)).await;

    // Node 3 config: joining as Learner
    let node3_config = format!(
        r#"
[cluster]
node_id = 3
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
    {{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
    {{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{}'
log_dir = '{}'

[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
        ports[2],
        ports[0],
        ports[1],
        ports[2],
        db_root_dir.display(),
        log_dir.display()
    );

    let node3_config_path = "/tmp/scale_test_node3.toml";
    tokio::fs::write(node3_config_path, &node3_config).await?;

    let config3 = node_config(&node3_config);
    let storage_path3 = config3.cluster.db_root_dir.join("node3/storage");
    let sm_path3 = config3.cluster.db_root_dir.join("node3/state_machine");

    tokio::fs::create_dir_all(&storage_path3).await?;
    tokio::fs::create_dir_all(&sm_path3).await?;

    let storage3 = Arc::new(RocksDBStorageEngine::new(storage_path3)?);
    let sm3 = Arc::new(RocksDBStateMachine::new(sm_path3)?);

    let engine3 = EmbeddedEngine::start_custom(storage3, sm3, Some(node3_config_path)).await?;
    info!("Node 3 started, joining as Learner");

    // Wait for promotion and cluster stabilization
    // Learners need time to: 1) sync data, 2) get promoted to Voter by leader
    tokio::time::sleep(Duration::from_secs(5)).await;

    // Phase 3: Verify cluster health
    info!("Phase 3: Verifying cluster health");

    // Node 1 should still be leader
    let current_leader = engine1.wait_ready(Duration::from_secs(2)).await?;
    assert_eq!(
        current_leader.leader_id, 1,
        "Node 1 should remain as leader"
    );
    info!("Verified: Node 1 is still leader after expansion");

    // Old data should be preserved
    let old_val = engine1.client().get_linearizable(b"dev-key".to_vec()).await?;
    assert_eq!(
        old_val.as_deref(),
        Some(b"dev-value".as_ref()),
        "Old data should be preserved"
    );
    info!("Verified: Old data preserved");

    // Write new data in cluster mode
    engine1.client().put(b"cluster-key".to_vec(), b"cluster-value".to_vec()).await?;
    tokio::time::sleep(Duration::from_millis(500)).await;

    // All nodes should be able to read replicated data
    for (i, engine) in [&engine1, &engine2, &engine3].iter().enumerate() {
        let val = engine.client().get_eventual(b"cluster-key".to_vec()).await?;
        assert_eq!(
            val.as_deref(),
            Some(b"cluster-value".as_ref()),
            "Node {} should have replicated data",
            i + 1
        );
    }
    info!("Verified: All 3 nodes can read replicated data");

    // Cleanup
    engine3.stop().await?;
    engine2.stop().await?;
    engine1.stop().await?;

    Ok(())
}

/// Test leader failover after dynamic scaling from single-node to 3-node cluster
///
/// Test Scenario:
/// This test validates the critical path of dynamic cluster expansion followed by leader failure,
/// ensuring the cluster maintains availability and data consistency throughout the process.
///
/// Phase 1: Single-Node Bootstrap
/// - Start Node 1 as standalone leader (initial_cluster = [n1])
/// - Node 1 automatically wins election (is_single_node_cluster() = true)
/// - Write baseline data: "phase1-key" = "phase1-value"
/// - Verify: Data committed successfully
///
/// Phase 2: Dynamic Expansion to 3-Node Cluster
/// - Start Node 2 as Learner (initial_cluster = [n1, n2], role=Learner)
/// - Start Node 3 as Learner (initial_cluster = [n1, n2, n3], role=Learner)
/// - Wait for Learners to sync data and auto-promote to Voters
/// - Verify: Node 1 remains leader, all 3 nodes operational
/// - Write cluster data: "phase2-key" = "phase2-value"
/// - Verify: Data replicated to all 3 nodes
///
/// Phase 3: Leader Failover
/// - Stop Node 1 (original leader crashes)
/// - Verify: Node 2 or Node 3 conducts election (is_single_node_cluster() = false)
/// - Verify: New leader elected within election timeout (~3-6 seconds)
/// - Verify: New leader ID is 2 or 3 (not the crashed node)
///
/// Phase 4: Verify New Leader Election
/// - Check new leader from Node 2's perspective
/// - Critical assertion: New leader must NOT be the crashed node
///
/// Phase 5: Post-Failover Service Continuity
/// - Write new data via new leader: "phase3-key" = "phase3-value"
/// - Verify: Write succeeds with 2/3 quorum
/// - Read from remaining 2 nodes, verify:
///   * "phase1-key" preserved (data before expansion)
///   * "phase2-key" preserved (data during 3-node operation)
///   * "phase3-key" replicated (data after failover)
///
/// Phase 6: Node Rejoin - Automatic Request Forwarding Validation
/// - Restart Node 1 as follower (rejoins existing 3-node cluster)
/// - Verify: Node 1 recognizes current leader (Node 2 or 3)
/// - Test linearizable reads from follower (auto-forwarding to leader):
///   * Read "phase1-key" via get_linearizable() - forwards to leader
///   * Read "phase2-key" via get_linearizable() - forwards to leader
///   * Read "phase3-key" via get_linearizable() - forwards to leader
/// - Test writes from follower (auto-forwarding to leader):
///   * Write "phase4-key" = "phase4-value" - forwards to leader
///   * Verify write succeeds with 3/3 quorum
/// - Verify data replication across all nodes:
///   * Node 2 (leader or follower) has "phase4-key"
///   * Node 3 (leader or follower) has "phase4-key"
///
/// Critical Assertions:
/// 1. Node 2/3 must NOT skip election (is_single_node_cluster() = false)
/// 2. New leader must be elected within reasonable time (< 10 seconds)
/// 3. No data loss across all phases
/// 4. Cluster remains operational with 2/3 majority
#[tokio::test]
#[traced_test]
async fn test_leader_failover_after_dynamic_scaling() -> Result<(), Box<dyn std::error::Error>> {
    // reset(&format!("{TEST_DIR}_failover")).await?;

    let temp_dir = tempfile::tempdir()?;
    let db_root_dir = temp_dir.path().join("db");
    let log_dir = temp_dir.path().join("logs");

    let mut port_guard = get_available_ports(3).await;
    port_guard.release_listeners();
    let ports = port_guard.as_slice();
    let db_root = format!("{}_failover", db_root_dir.display());
    let log_dir = format!("{}_failover", log_dir.display());

    // ============================================================================
    // Phase 1: Single-Node Bootstrap
    // ============================================================================
    info!("Phase 1: Starting single-node cluster");

    let node1_config = format!(
        r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }}
]
db_root_dir = '{}'
log_dir = '{}'

[raft]
general_raft_timeout_duration_in_ms = 100
[raft.election]
election_timeout_min = 300
election_timeout_max = 600
"#,
        ports[0], ports[0], db_root, log_dir
    );

    let node1_config_path = "/tmp/failover_test_node1.toml";
    tokio::fs::write(node1_config_path, &node1_config).await?;

    let config1 = node_config(&node1_config);
    let node1_db_root = config1.cluster.db_root_dir.join("node1");
    let storage_path1 = node1_db_root.join("storage");
    let sm_path1 = node1_db_root.join("state_machine");

    tokio::fs::create_dir_all(&storage_path1).await?;
    tokio::fs::create_dir_all(&sm_path1).await?;

    let storage1 = Arc::new(RocksDBStorageEngine::new(storage_path1)?);
    let sm1 = Arc::new(RocksDBStateMachine::new(sm_path1)?);

    let engine1 = EmbeddedEngine::start_custom(storage1, sm1, Some(node1_config_path)).await?;

    let initial_leader = engine1.wait_ready(Duration::from_secs(5)).await?;
    info!(
        "Phase 1 Complete: Node {} elected as leader (term {})",
        initial_leader.leader_id, initial_leader.term
    );
    assert_eq!(
        initial_leader.leader_id, 1,
        "Node 1 should be initial leader"
    );

    // Write baseline data
    engine1.client().put(b"phase1-key".to_vec(), b"phase1-value".to_vec()).await?;
    tokio::time::sleep(Duration::from_millis(100)).await;

    let val = engine1.client().get_linearizable(b"phase1-key".to_vec()).await?;
    assert_eq!(val.as_deref(), Some(b"phase1-value".as_ref()));
    info!("Phase 1: Baseline data written successfully");

    // ============================================================================
    // Phase 2: Dynamic Expansion to 3-Node Cluster
    // ============================================================================
    info!("Phase 2: Expanding to 3-node cluster");

    // Start Node 2 as Learner
    let node2_config = format!(
        r#"
[cluster]
node_id = 2
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
    {{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{db_root}'
log_dir = '{log_dir}'

[raft]
general_raft_timeout_duration_in_ms = 5000
[raft.election]
election_timeout_min = 3000
election_timeout_max = 6000
"#,
        ports[1], ports[0], ports[1]
    );

    let node2_config_path = "/tmp/failover_test_node2.toml";
    tokio::fs::write(node2_config_path, &node2_config).await?;

    let config2 = node_config(&node2_config);
    let node2_db_root = config2.cluster.db_root_dir.join("node2");
    let storage_path2 = node2_db_root.join("storage");
    let sm_path2 = node2_db_root.join("state_machine");

    tokio::fs::create_dir_all(&storage_path2).await?;
    tokio::fs::create_dir_all(&sm_path2).await?;

    let storage2 = Arc::new(RocksDBStorageEngine::new(storage_path2)?);
    let sm2 = Arc::new(RocksDBStateMachine::new(sm_path2)?);

    let engine2 = EmbeddedEngine::start_custom(storage2, sm2, Some(node2_config_path)).await?;
    info!("Node 2 started as Learner");

    tokio::time::sleep(Duration::from_secs(3)).await;

    // Start Node 3 as Learner
    let node3_config = format!(
        r#"
[cluster]
node_id = 3
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
    {{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
    {{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{db_root}'
log_dir = '{log_dir}'

[raft]
general_raft_timeout_duration_in_ms = 5000
[raft.election]
election_timeout_min = 3000
election_timeout_max = 6000
"#,
        ports[2], ports[0], ports[1], ports[2]
    );

    let node3_config_path = "/tmp/failover_test_node3.toml";
    tokio::fs::write(node3_config_path, &node3_config).await?;

    let config3 = node_config(&node3_config);
    let node3_db_root = config3.cluster.db_root_dir.join("node3");
    let storage_path3 = node3_db_root.join("storage");
    let sm_path3 = node3_db_root.join("state_machine");

    tokio::fs::create_dir_all(&storage_path3).await?;
    tokio::fs::create_dir_all(&sm_path3).await?;

    let storage3 = Arc::new(RocksDBStorageEngine::new(storage_path3)?);
    let sm3 = Arc::new(RocksDBStateMachine::new(sm_path3)?);

    let engine3 = EmbeddedEngine::start_custom(storage3, sm3, Some(node3_config_path)).await?;
    info!("Node 3 started as Learner");

    // Wait for Learner promotion and cluster stabilization
    tokio::time::sleep(Duration::from_secs(15)).await;

    // Verify Node 1 still leader
    let leader_before_failover = engine1.wait_ready(Duration::from_secs(2)).await?;
    assert_eq!(
        leader_before_failover.leader_id, 1,
        "Node 1 should still be leader"
    );

    // Write data in 3-node cluster
    engine1.client().put(b"phase2-key".to_vec(), b"phase2-value".to_vec()).await?;
    tokio::time::sleep(Duration::from_millis(500)).await;

    // Verify replication to all nodes
    for (i, engine) in [&engine1, &engine2, &engine3].iter().enumerate() {
        let val = engine.client().get_eventual(b"phase2-key".to_vec()).await?;
        assert_eq!(
            val.as_deref(),
            Some(b"phase2-value".as_ref()),
            "Node {} should have phase2 data",
            i + 1
        );
    }
    info!("Phase 2 Complete: 3-node cluster operational, data replicated");

    // ============================================================================
    // Phase 3: Leader Failover - Simulate Node 1 Crash
    // ============================================================================
    info!("Phase 3: Simulating leader crash (stopping Node 1)");

    engine1.stop().await?;
    info!("Node 1 stopped - cluster should detect leader failure and start election");

    // Wait for election timeout + re-election
    // With election_timeout_min=3000ms, election should complete within ~5-8 seconds
    tokio::time::sleep(Duration::from_secs(8)).await;

    // ============================================================================
    // Phase 4: Verify New Leader Election
    // ============================================================================
    info!("Phase 4: Verifying new leader election");

    // Check new leader from Node 2's perspective
    let new_leader = engine2.wait_ready(Duration::from_secs(5)).await?;
    info!(
        "New leader elected: Node {} (term {})",
        new_leader.leader_id, new_leader.term
    );

    // Critical assertion: New leader must NOT be the crashed node
    assert_ne!(
        new_leader.leader_id, 1,
        "New leader cannot be the crashed Node 1"
    );
    assert!(
        new_leader.leader_id == 2 || new_leader.leader_id == 3,
        "New leader must be Node 2 or Node 3"
    );
    assert!(
        new_leader.term > initial_leader.term,
        "New leader must have higher term than initial leader"
    );

    // Determine new leader's engine
    let new_leader_engine = if new_leader.leader_id == 2 {
        &engine2
    } else {
        &engine3
    };

    // ============================================================================
    // Phase 5: Verify Service Continuity After Failover
    // ============================================================================
    info!("Phase 5: Verifying service continuity with new leader");

    // Write data via new leader
    new_leader_engine
        .client()
        .put(b"phase3-key".to_vec(), b"phase3-value".to_vec())
        .await?;
    tokio::time::sleep(Duration::from_millis(500)).await;

    // Verify all historical data preserved on remaining nodes
    for (i, engine) in [&engine2, &engine3].iter().enumerate() {
        let node_id = if i == 0 { 2 } else { 3 };

        // Phase 1 data (before expansion)
        let val1 = engine.client().get_eventual(b"phase1-key".to_vec()).await?;
        assert_eq!(
            val1.as_deref(),
            Some(b"phase1-value".as_ref()),
            "Node {node_id} should preserve phase1 data"
        );

        // Phase 2 data (during 3-node operation)
        let val2 = engine.client().get_eventual(b"phase2-key".to_vec()).await?;
        assert_eq!(
            val2.as_deref(),
            Some(b"phase2-value".as_ref()),
            "Node {node_id} should preserve phase2 data"
        );

        // Phase 3 data (after failover)
        let val3 = engine.client().get_eventual(b"phase3-key".to_vec()).await?;
        assert_eq!(
            val3.as_deref(),
            Some(b"phase3-value".as_ref()),
            "Node {node_id} should have replicated phase3 data"
        );
    }

    info!("Phase 5 Complete: All data preserved, cluster operational with 2/3 nodes");

    // ============================================================================
    // Phase 6: Node 1 Rejoin and Verify Linearizable Read
    // ============================================================================
    info!("Phase 6: Restarting Node 1 as follower and verifying linearizable read");

    // Restart Node 1 (it was crashed at end of Phase 4)
    // Node 1 will rejoin as a follower since it already had membership
    // Must use db_root (with _failover suffix), NOT db_root_dir — all nodes use db_root
    let node1_db_root = std::path::PathBuf::from(&db_root).join("node1");
    let node1_storage_path = node1_db_root.join("storage");
    let node1_sm_path = node1_db_root.join("state_machine");

    // Reuse existing storage/state_machine directories (preserved from earlier phases)
    let node1_storage = Arc::new(RocksDBStorageEngine::new(node1_storage_path)?);
    let node1_state_machine = Arc::new(RocksDBStateMachine::new(node1_sm_path)?);

    // Node 1 config: rejoining as existing follower
    let node1_config_str = format!(
        r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
    {{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 1, status = 3 }},
    {{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
    {{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 1, status = 3 }}
]
db_root_dir = '{}'

[raft]
election_timeout_ms = 150
heartbeat_interval_ms = 50
"#,
        ports[0], ports[0], ports[1], ports[2], db_root
    );

    let node1_config_path = "/tmp/d-engine-test-node1-phase6.toml".to_string();
    tokio::fs::write(&node1_config_path, &node1_config_str).await?;

    let engine1 =
        EmbeddedEngine::start_custom(node1_storage, node1_state_machine, Some(&node1_config_path))
            .await?;

    info!("Node 1 restarted, waiting for leader recognition");
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Verify Node 1 rejoined the cluster (should be a follower)
    let node1_leader = engine1.wait_ready(Duration::from_secs(5)).await?;
    info!(
        "Node 1 back in cluster: leader is {} (term {})",
        node1_leader.leader_id, node1_leader.term
    );
    assert!(
        node1_leader.leader_id == 2 || node1_leader.leader_id == 3,
        "Node 1 should recognize current leader"
    );

    // Phase 6a: Verify data is synced to rejoined Node 1
    // Node 1 rejoins as follower — wait_ready only guarantees leader recognition,
    // not that log replication has caught up. Retry until data is visible locally.
    info!("Phase 6a: Waiting for Node 1 to sync data after rejoin");

    let phase1_val = {
        let mut val = None;
        for _ in 0..20 {
            val = engine1.client().get_eventual(b"phase1-key".to_vec()).await?;
            if val.is_some() {
                break;
            }
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
        val
    };
    assert_eq!(
        phase1_val.as_deref(),
        Some(b"phase1-value".as_ref()),
        "Node 1 should have phase1 data after sync"
    );

    let phase2_val = {
        let mut val = None;
        for _ in 0..20 {
            val = engine1.client().get_eventual(b"phase2-key".to_vec()).await?;
            if val.is_some() {
                break;
            }
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
        val
    };
    assert_eq!(
        phase2_val.as_deref(),
        Some(b"phase2-value".as_ref()),
        "Node 1 should have phase2 data after sync"
    );

    let phase3_val = {
        let mut val = None;
        for _ in 0..20 {
            val = engine1.client().get_eventual(b"phase3-key".to_vec()).await?;
            if val.is_some() {
                break;
            }
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
        val
    };
    assert_eq!(
        phase3_val.as_deref(),
        Some(b"phase3-value".as_ref()),
        "Node 1 should have phase3 data after sync"
    );

    info!("Phase 6a Complete: All data synced to Node 1 after rejoin");

    // Phase 6b: Verify cluster consistency with all 3 nodes active
    info!("Phase 6b: Verifying 3-node cluster consistency");

    // Write new data via current leader
    new_leader_engine
        .client()
        .put(b"phase6-key".to_vec(), b"phase6-value".to_vec())
        .await?;

    tokio::time::sleep(Duration::from_millis(500)).await;

    // Verify all three nodes have the new data
    for (engine, node_id) in [
        (&engine1, 1),
        (new_leader_engine, new_leader.leader_id),
        (&engine3, 3),
    ] {
        if node_id == new_leader.leader_id && new_leader.leader_id != 3 {
            continue; // Skip if it's engine2 and we're checking engine3
        }

        let val = engine.client().get_eventual(b"phase6-key".to_vec()).await?;
        assert_eq!(
            val.as_deref(),
            Some(b"phase6-value".as_ref()),
            "Node {node_id} should have phase6 data"
        );
    }

    info!("Phase 6b Complete: 3-node cluster fully synchronized");
    info!("Phase 6 Complete: Node 1 rejoin and linearizable read validation passed");

    // ============================================================================
    // Cleanup
    // ============================================================================
    engine3.stop().await?;
    engine2.stop().await?;

    info!("Test completed successfully - dynamic scaling with leader failover validated");
    Ok(())
}