yantrikdb-server 0.8.17

YantrikDB database server — multi-tenant cognitive memory with wire protocol, HTTP gateway, replication, auto-failover, and at-rest encryption
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
//! Raft cluster assembly — wires log_storage + state_machine +
//! HttpRaftNetworkFactory + Raft into a working cluster, **and**
//! enforces the production invariants the individual components can't
//! enforce on their own.
//!
//! ## Production invariants (fail-fast at assembly time)
//!
//! 1. **mTLS is required when cluster mode is OpenRaft.** If
//!    [`RaftClusterMode::OpenRaft`] is requested but `cluster_tls`
//!    isn't fully specified, [`build_raft_cluster`] returns
//!    [`AssemblyError::MtlsRequired`] before any sockets are opened.
//!    This means an operator can't accidentally ship plaintext cluster
//!    traffic — a misconfigured server refuses to start at all.
//! 2. **Dev-mode is allowed but loud.** A deployment that sets
//!    `cluster_tls.dev_mode = true` AND OpenRaft mode is permitted (so
//!    dev clusters can run with self-signed certs) but emits a warning
//!    log at assembly time.
//! 3. **Disabled mode is plaintext-OK.** When mode is
//!    [`RaftClusterMode::Disabled`] (single-node), the assembly
//!    function isn't called at all — the existing `LocalSqliteCommitter`
//!    is used directly. The gate is "openraft enabled".
//!
//! ## What this module does NOT include
//!
//! - The actual server.rs wiring that calls [`build_raft_cluster`] —
//!   that's a follow-up when the live cluster mode flag exists.
//! - Snapshot transport optimization (chunking, bincode) — see RFC 010
//!   PR-4 review notes.
//! - Linearizable reads via `Raft::ensure_linearizable()`.

use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use openraft::{Config, Raft};
use serde::{Deserialize, Serialize};
use thiserror::Error;

use super::committer::RaftCommitter;
use super::http_network::HttpRaftNetworkFactory;
use super::log_storage::SqliteRaftLogStorage;
use super::state_machine::YantrikStateMachine;
use super::types::{YantrikNode, YantrikNodeId, YantrikRaftTypeConfig};
use crate::commit::{Applier, MutationCommitter};
use crate::security::cluster_tls::{ClusterTlsConfig, ClusterTlsError};

/// Whether this server runs in cluster mode. `Disabled` means
/// single-node (existing `LocalSqliteCommitter`); `OpenRaft` means a
/// real Raft cluster with mTLS-required production gate.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RaftClusterMode {
    /// Single-node — no cluster transport, no Raft. Application code
    /// holds an `Arc<LocalSqliteCommitter>` and serves writes locally.
    Disabled,
    /// 3+ node cluster via openraft. Requires fully-specified
    /// `cluster_tls` (or `dev_mode = true` with a warning log).
    OpenRaft,
}

impl Default for RaftClusterMode {
    fn default() -> Self {
        RaftClusterMode::Disabled
    }
}

/// What backs the HTTP handler write path. RFC 010 PR-6.5 boot invariant
/// gate: `OpenRaft` cluster mode REQUIRES `RaftSubmitter` here. Any
/// other combination is rejected at assembly time so the cluster cannot
/// regress to "cosmetic openraft" mode (writes land locally, replication
/// reports healthy but moves zero application bytes).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum HandlerWritePath {
    /// Handlers call into a `LocalSqliteSubmitter` (or directly into
    /// `engine.record()` on the legacy unmigrated path). Single-node
    /// only — pairs exclusively with `RaftClusterMode::Disabled`.
    LocalSqlite,
    /// Handlers call into a `RaftSubmitter` that routes through openraft
    /// consensus. Pairs exclusively with `RaftClusterMode::OpenRaft`.
    RaftSubmitter,
}

impl Default for HandlerWritePath {
    fn default() -> Self {
        HandlerWritePath::LocalSqlite
    }
}

#[derive(Debug, Error)]
pub enum AssemblyError {
    /// `cluster.mode = "openraft"` was requested but `cluster_tls`
    /// isn't fully specified. Fail-fast — refuses to start the server.
    #[error(
        "openraft mode requires fully-specified cluster_tls (cert_path, key_path, ca_path) \
         to prevent accidental plaintext cluster traffic; missing field: {missing}"
    )]
    MtlsRequired { missing: &'static str },

    /// `cluster_tls` failed to build into rustls / reqwest configs.
    #[error("cluster_tls build failed: {0}")]
    ClusterTls(#[from] ClusterTlsError),

    /// Building the reqwest client for inter-node Raft RPCs failed.
    #[error("reqwest client build failed: {0}")]
    ReqwestBuild(String),

    /// openraft `Raft::new` returned a fatal error during assembly.
    /// Indicates a fundamental misconfiguration — surface to the
    /// operator unchanged.
    #[error("openraft Raft::new fatal: {0}")]
    RaftNew(String),

    /// Reading a cluster_tls cert/key/CA file as bytes for reqwest
    /// failed.
    #[error("read PEM file `{path}`: {source}")]
    PemRead {
        path: std::path::PathBuf,
        #[source]
        source: std::io::Error,
    },

    /// `cluster.mode = "openraft"` was requested but the handler write
    /// path is configured as something other than `RaftSubmitter`. This
    /// is the cosmetic-openraft regression gate: openraft can be assembled
    /// with a non-Raft handler path, but if the binary boots in that
    /// state it lies in `/v1/health` (reports `healthy: true`) while
    /// every write bypasses replication. Refuse at boot.
    #[error(
        "openraft mode requires handler_write_path = \"raft_submitter\"; got {actual:?}. \
         Configure cluster.handler_write_path = \"raft_submitter\", or set \
         cluster.raft_mode = \"disabled\" for single-node deployments."
    )]
    WritePathMismatch {
        actual: HandlerWritePath,
        expected: HandlerWritePath,
    },

    /// `cluster.mode = "openraft"` was requested but the cluster has
    /// fewer than 2 declared peers. A 1-peer "cluster" can't form a
    /// quorum, can't survive a single-node failure, and is almost
    /// certainly a misconfiguration. Refuse at boot.
    #[error(
        "openraft mode requires at least 2 peers (got {have}). \
         A 1-peer cluster has no quorum semantics; configure additional \
         peers in cluster.peers or set cluster.raft_mode = \"disabled\"."
    )]
    InsufficientPeers { have: usize, need: usize },
}

/// Inputs to [`build_raft_cluster`].
pub struct RaftAssemblyConfig {
    pub mode: RaftClusterMode,
    /// Local node id within the cluster.
    pub node_id: YantrikNodeId,
    /// HTTP address other peers reach this node at (e.g.
    /// `https://10.0.0.5:7100`). Stored in the membership log.
    pub node_addr: String,
    /// Cluster TLS config — required when `mode == OpenRaft`. Ignored
    /// when `mode == Disabled`.
    pub cluster_tls: Option<ClusterTlsConfig>,
    /// Full cluster voter set (typically including this node's address).
    /// PR-6.5 boot invariant: `OpenRaft` mode requires `peers.len() >= 2`.
    /// Empty / 1-element peer lists are misconfiguration and rejected
    /// at boot.
    pub peers: Vec<String>,
    /// What backs the HTTP handler write path. PR-6.5 boot invariant:
    /// `OpenRaft` mode requires `RaftSubmitter` here.
    pub write_path: HandlerWritePath,
    /// RFC 010 PR-6.4 — engine apply path. Every committed mutation on
    /// every node (leader + followers) flows through this Applier so
    /// engine state stays in lock-step with the commit log.
    /// Production wiring uses
    /// [`crate::commit::EngineApplier`] over a
    /// [`crate::tenant_pool::TenantPoolEngineResolver`]; tests pass
    /// [`crate::commit::LocalApplier`] for trait-shape coverage.
    pub applier: Arc<dyn Applier>,
    /// Per-RPC timeout for the reqwest client.
    pub request_timeout: Duration,
    /// openraft heartbeat / election tuning.
    pub openraft_config: Config,
}

impl RaftAssemblyConfig {
    /// Reasonable defaults for production: 200ms heartbeat, 800ms-1.6s
    /// election timeout, 10s RPC timeout.
    pub fn production_defaults(node_id: YantrikNodeId, node_addr: String) -> Self {
        Self {
            mode: RaftClusterMode::OpenRaft,
            node_id,
            node_addr,
            cluster_tls: None,                           // operator MUST supply
            peers: Vec::new(),                           // operator MUST supply (PR-6.5 gate)
            write_path: HandlerWritePath::RaftSubmitter, // openraft requires it
            // Defaults to LocalApplier (placeholder). Production callers
            // OVERRIDE this with EngineApplier wired through TenantPool —
            // leaving the default in place yields a cluster that commits
            // log entries but never applies them to engine state. Tests
            // use the placeholder intentionally to exercise the trait
            // shape without spinning up real engines.
            applier: Arc::new(crate::commit::LocalApplier::new()),
            request_timeout: Duration::from_secs(10),
            openraft_config: Config {
                cluster_name: "yantrikdb".into(),
                heartbeat_interval: 200,
                election_timeout_min: 800,
                election_timeout_max: 1600,
                ..Default::default()
            },
        }
    }

    /// Validate the assembly config against the PR-6.5 boot invariants.
    /// Called from [`build_raft_cluster`] before any sockets are opened.
    /// Exposed pub(crate) so tests can hit the gate without spinning up
    /// the full Raft + reqwest stack.
    ///
    /// Invariants enforced (in order):
    /// 1. `OpenRaft` mode requires `RaftSubmitter` handler write path.
    /// 2. `OpenRaft` mode requires `peers.len() >= 2` for real quorum.
    /// 3. `cluster_tls` checks happen later inside [`build_raft_cluster`]
    ///    via [`validate_cluster_tls_for_openraft`].
    pub(crate) fn validate(&self) -> Result<(), AssemblyError> {
        if self.mode == RaftClusterMode::OpenRaft {
            if self.write_path != HandlerWritePath::RaftSubmitter {
                return Err(AssemblyError::WritePathMismatch {
                    actual: self.write_path,
                    expected: HandlerWritePath::RaftSubmitter,
                });
            }
            if self.peers.len() < 2 {
                return Err(AssemblyError::InsufficientPeers {
                    have: self.peers.len(),
                    need: 2,
                });
            }
        }
        Ok(())
    }
}

/// Result of a successful assembly.
pub struct RaftAssembly {
    pub raft: Arc<Raft<YantrikRaftTypeConfig>>,
    pub committer: RaftCommitter,
    /// Snapshot of the bound network factory — `RaftCommitter` already
    /// closed over its own copy; this is here so callers can build
    /// additional clients (e.g. for join-cluster CLI flows).
    pub network_factory: HttpRaftNetworkFactory,
}

/// Validate the cluster_tls config for openraft mode. Returns the
/// specific missing field so the error message is actionable.
fn validate_cluster_tls_for_openraft(
    cluster_tls: Option<&ClusterTlsConfig>,
) -> Result<&ClusterTlsConfig, AssemblyError> {
    let cfg = cluster_tls.ok_or(AssemblyError::MtlsRequired {
        missing: "cluster_tls (entire section)",
    })?;
    if cfg.cert_path.is_none() {
        return Err(AssemblyError::MtlsRequired {
            missing: "cert_path",
        });
    }
    if cfg.key_path.is_none() {
        return Err(AssemblyError::MtlsRequired {
            missing: "key_path",
        });
    }
    if cfg.ca_path.is_none() {
        return Err(AssemblyError::MtlsRequired { missing: "ca_path" });
    }
    Ok(cfg)
}

/// Build a reqwest client that does mTLS using the cluster certs.
fn build_reqwest_client_for_cluster(
    cfg: &ClusterTlsConfig,
    request_timeout: Duration,
) -> Result<reqwest::Client, AssemblyError> {
    let cert_path = cfg.cert_path.as_ref().expect("validated above");
    let key_path = cfg.key_path.as_ref().expect("validated above");
    let ca_path = cfg.ca_path.as_ref().expect("validated above");

    let cert_pem = std::fs::read(cert_path).map_err(|e| AssemblyError::PemRead {
        path: cert_path.clone(),
        source: e,
    })?;
    let key_pem = std::fs::read(key_path).map_err(|e| AssemblyError::PemRead {
        path: key_path.clone(),
        source: e,
    })?;
    let ca_pem = std::fs::read(ca_path).map_err(|e| AssemblyError::PemRead {
        path: ca_path.clone(),
        source: e,
    })?;

    // reqwest's Identity::from_pem accepts a single bundle (cert + key).
    // We concatenate so the operator can store them separately without
    // having to keep a "bundled" file in sync.
    let mut bundle = cert_pem.clone();
    bundle.push(b'\n');
    bundle.extend_from_slice(&key_pem);

    let identity = reqwest::Identity::from_pem(&bundle)
        .map_err(|e| AssemblyError::ReqwestBuild(format!("Identity::from_pem: {e}")))?;
    let ca_cert = reqwest::Certificate::from_pem(&ca_pem)
        .map_err(|e| AssemblyError::ReqwestBuild(format!("Certificate::from_pem: {e}")))?;

    let mut builder = reqwest::Client::builder()
        .timeout(request_timeout)
        .identity(identity)
        .add_root_certificate(ca_cert)
        // Don't auto-trust system roots: cluster traffic uses the
        // explicit cluster CA.
        .tls_built_in_root_certs(false);

    if cfg.dev_mode {
        tracing::warn!(
            "cluster_tls.dev_mode = true — accepting self-signed peer certs. \
             NEVER set this in production."
        );
        builder = builder.danger_accept_invalid_certs(true);
    }

    builder.build().map_err(|e| {
        // Surface the source chain — the bare reqwest::Error is just
        // "builder error" with the actual cause buried in the source chain.
        let mut chain = format!("build: {e}");
        let mut src = std::error::Error::source(&e);
        while let Some(s) = src {
            chain.push_str(&format!(" / {s}"));
            src = s.source();
        }
        AssemblyError::ReqwestBuild(chain)
    })
}

/// Wire a Raft cluster from its constituent parts. Enforces the
/// mTLS gate (production invariant #1).
///
/// The `local` committer is the same `MutationCommitter` the state
/// machine apply path drives — typically an `Arc<LocalSqliteCommitter>`.
/// The `RaftCommitter` then routes writes through openraft and reads
/// through `local` (stale-OK semantics).
pub async fn build_raft_cluster(
    cfg: RaftAssemblyConfig,
    log_storage: SqliteRaftLogStorage,
    local: Arc<dyn MutationCommitter>,
) -> Result<RaftAssembly, AssemblyError> {
    // PR-6.5 boot invariants: write-path coupling + peer count. Run
    // BEFORE the cluster_tls / cert IO checks so a misconfigured
    // handler path fails fast even if certs are missing.
    cfg.validate()?;
    let cluster_tls = validate_cluster_tls_for_openraft(cfg.cluster_tls.as_ref())?;
    let client = build_reqwest_client_for_cluster(cluster_tls, cfg.request_timeout)?;
    let network_factory = HttpRaftNetworkFactory::new(client, cfg.request_timeout);

    let validated_config = Arc::new(
        cfg.openraft_config
            .validate()
            .map_err(|e| AssemblyError::RaftNew(format!("openraft Config::validate: {e}")))?,
    );

    // RFC 010 PR-6.4 — state machine apply path is driven by the Applier
    // supplied in `cfg.applier`. Production wires `EngineApplier` over a
    // `TenantPool`-backed resolver from main.rs so every committed
    // mutation on every node (leader + followers) writes engine state.
    // Tests pass `LocalApplier` to exercise the trait shape without
    // spinning up real engines.
    let state_machine = YantrikStateMachine::new(local.clone(), cfg.applier);
    let raft = Raft::<YantrikRaftTypeConfig>::new(
        cfg.node_id,
        validated_config,
        network_factory.clone(),
        log_storage,
        state_machine,
    )
    .await
    .map_err(|e| AssemblyError::RaftNew(format!("{e}")))?;
    let raft = Arc::new(raft);

    let committer = RaftCommitter::new(raft.clone(), local);

    Ok(RaftAssembly {
        raft,
        committer,
        network_factory,
    })
}

/// Convenience helper: initialize a brand-new single-node cluster on
/// the given assembly. Used during cluster bootstrap (`yantrikdb
/// cluster init`). For joining an existing cluster, callers use
/// `Raft::add_learner` + `Raft::change_membership` against the existing
/// leader instead.
pub async fn initialize_single_node(
    assembly: &RaftAssembly,
    node_addr: String,
) -> Result<
    (),
    openraft::error::RaftError<
        YantrikNodeId,
        openraft::error::InitializeError<YantrikNodeId, YantrikNode>,
    >,
> {
    let me = {
        let metrics = assembly.raft.metrics().borrow().clone();
        metrics.id
    };
    let mut nodes = BTreeMap::new();
    nodes.insert(me, YantrikNode::new(node_addr));
    assembly.raft.initialize(nodes).await
}

#[cfg(test)]
mod tests {
    use super::*;

    fn empty_tls() -> ClusterTlsConfig {
        ClusterTlsConfig::default()
    }

    fn tls_with(cert: Option<&str>, key: Option<&str>, ca: Option<&str>) -> ClusterTlsConfig {
        ClusterTlsConfig {
            cert_path: cert.map(std::path::PathBuf::from),
            key_path: key.map(std::path::PathBuf::from),
            ca_path: ca.map(std::path::PathBuf::from),
            dev_mode: false,
            rotate_check_secs: 60,
        }
    }

    #[test]
    fn openraft_mode_rejects_missing_cluster_tls_section() {
        let err = validate_cluster_tls_for_openraft(None).unwrap_err();
        match err {
            AssemblyError::MtlsRequired { missing } => assert!(missing.contains("cluster_tls")),
            other => panic!("expected MtlsRequired, got {other:?}"),
        }
    }

    #[test]
    fn openraft_mode_rejects_empty_tls_config() {
        let cfg = empty_tls();
        let err = validate_cluster_tls_for_openraft(Some(&cfg)).unwrap_err();
        match err {
            AssemblyError::MtlsRequired { missing } => assert_eq!(missing, "cert_path"),
            other => panic!("expected MtlsRequired, got {other:?}"),
        }
    }

    #[test]
    fn openraft_mode_rejects_missing_key() {
        let cfg = tls_with(Some("/tmp/cert.pem"), None, Some("/tmp/ca.pem"));
        let err = validate_cluster_tls_for_openraft(Some(&cfg)).unwrap_err();
        match err {
            AssemblyError::MtlsRequired { missing } => assert_eq!(missing, "key_path"),
            other => panic!("expected MtlsRequired, got {other:?}"),
        }
    }

    #[test]
    fn openraft_mode_rejects_missing_ca() {
        let cfg = tls_with(Some("/tmp/cert.pem"), Some("/tmp/key.pem"), None);
        let err = validate_cluster_tls_for_openraft(Some(&cfg)).unwrap_err();
        match err {
            AssemblyError::MtlsRequired { missing } => assert_eq!(missing, "ca_path"),
            other => panic!("expected MtlsRequired, got {other:?}"),
        }
    }

    #[test]
    fn openraft_mode_accepts_fully_specified_tls() {
        let cfg = tls_with(
            Some("/tmp/cert.pem"),
            Some("/tmp/key.pem"),
            Some("/tmp/ca.pem"),
        );
        validate_cluster_tls_for_openraft(Some(&cfg))
            .expect("fully-specified config must pass validation");
    }

    #[tokio::test]
    async fn build_raft_cluster_fails_on_missing_cluster_tls() {
        // The whole point of the gate. Even if everything else is ready
        // to go, no cluster_tls means the assembly refuses.
        let local = Arc::new(crate::commit::LocalSqliteCommitter::open_in_memory().unwrap())
            as Arc<dyn MutationCommitter>;
        let log_storage = SqliteRaftLogStorage::open_in_memory();
        let cfg = RaftAssemblyConfig {
            mode: RaftClusterMode::OpenRaft,
            node_id: YantrikNodeId::new(1),
            node_addr: "https://127.0.0.1:7100".into(),
            cluster_tls: None,
            peers: vec![
                "https://127.0.0.1:7100".into(),
                "https://127.0.0.1:7101".into(),
            ],
            write_path: HandlerWritePath::RaftSubmitter,
            applier: Arc::new(crate::commit::LocalApplier::new()),
            request_timeout: Duration::from_secs(1),
            openraft_config: Config::default(),
        };
        match build_raft_cluster(cfg, log_storage, local).await {
            Err(AssemblyError::MtlsRequired { .. }) => {}
            Err(other) => panic!("expected MtlsRequired, got {other:?}"),
            Ok(_) => panic!("expected MtlsRequired, assembly succeeded"),
        }
    }

    #[tokio::test]
    async fn build_raft_cluster_fails_on_unreadable_cert_files() {
        // Paths exist syntactically but point at non-existent files.
        // Validation passes (all three paths supplied), but
        // build_reqwest_client_for_cluster errors out on read.
        let local = Arc::new(crate::commit::LocalSqliteCommitter::open_in_memory().unwrap())
            as Arc<dyn MutationCommitter>;
        let log_storage = SqliteRaftLogStorage::open_in_memory();
        let cluster_tls = tls_with(
            Some("/nonexistent/cert.pem"),
            Some("/nonexistent/key.pem"),
            Some("/nonexistent/ca.pem"),
        );
        let cfg = RaftAssemblyConfig {
            mode: RaftClusterMode::OpenRaft,
            node_id: YantrikNodeId::new(1),
            node_addr: "https://127.0.0.1:7100".into(),
            cluster_tls: Some(cluster_tls),
            peers: vec![
                "https://127.0.0.1:7100".into(),
                "https://127.0.0.1:7101".into(),
            ],
            write_path: HandlerWritePath::RaftSubmitter,
            applier: Arc::new(crate::commit::LocalApplier::new()),
            request_timeout: Duration::from_secs(1),
            openraft_config: Config::default(),
        };
        match build_raft_cluster(cfg, log_storage, local).await {
            Err(AssemblyError::PemRead { .. }) => {}
            Err(other) => panic!("expected PemRead, got {other:?}"),
            Ok(_) => panic!("expected PemRead, assembly succeeded"),
        }
    }

    #[test]
    fn cluster_mode_default_is_disabled() {
        // Operators must opt INTO cluster mode. A fresh server config
        // with no cluster section runs single-node — no plaintext gate
        // can be tripped by accident.
        assert_eq!(RaftClusterMode::default(), RaftClusterMode::Disabled);
    }

    #[test]
    fn production_defaults_demand_explicit_tls() {
        // production_defaults() returns mode=OpenRaft + cluster_tls=None.
        // Operator must explicitly supply cluster_tls before assembly
        // succeeds — the defaults exist as a starting template, not a
        // ready-to-run config.
        let d = RaftAssemblyConfig::production_defaults(
            YantrikNodeId::new(1),
            "https://10.0.0.1:7100".into(),
        );
        assert_eq!(d.mode, RaftClusterMode::OpenRaft);
        assert!(
            d.cluster_tls.is_none(),
            "production_defaults must NOT bake in any cluster_tls — operator supplies it"
        );
    }

    // ── PR-6.5 boot invariant tests ─────────────────────────────────

    fn cfg_for(
        mode: RaftClusterMode,
        write_path: HandlerWritePath,
        peers: Vec<String>,
    ) -> RaftAssemblyConfig {
        RaftAssemblyConfig {
            mode,
            node_id: YantrikNodeId::new(1),
            node_addr: "https://10.0.0.1:7100".into(),
            cluster_tls: Some(tls_with(
                Some("/tmp/cert.pem"),
                Some("/tmp/key.pem"),
                Some("/tmp/ca.pem"),
            )),
            peers,
            write_path,
            applier: Arc::new(crate::commit::LocalApplier::new()),
            request_timeout: Duration::from_secs(1),
            openraft_config: Config::default(),
        }
    }

    fn three_peer_set() -> Vec<String> {
        vec![
            "https://10.0.0.1:7100".into(),
            "https://10.0.0.2:7100".into(),
            "https://10.0.0.3:7100".into(),
        ]
    }

    #[test]
    fn pr_6_5_openraft_with_localsqlite_write_path_is_rejected() {
        // The cosmetic-openraft regression gate. If this test ever
        // accepts the misconfiguration, the whole point of PR 6.5 is
        // gone — refuse the boot, not eventually surface a 503 in
        // /v1/health.
        let cfg = cfg_for(
            RaftClusterMode::OpenRaft,
            HandlerWritePath::LocalSqlite,
            three_peer_set(),
        );
        match cfg.validate() {
            Err(AssemblyError::WritePathMismatch { actual, expected }) => {
                assert_eq!(actual, HandlerWritePath::LocalSqlite);
                assert_eq!(expected, HandlerWritePath::RaftSubmitter);
            }
            other => panic!("expected WritePathMismatch, got {other:?}"),
        }
    }

    #[test]
    fn pr_6_5_openraft_with_empty_peers_is_rejected() {
        let cfg = cfg_for(
            RaftClusterMode::OpenRaft,
            HandlerWritePath::RaftSubmitter,
            vec![],
        );
        match cfg.validate() {
            Err(AssemblyError::InsufficientPeers { have, need }) => {
                assert_eq!(have, 0);
                assert_eq!(need, 2);
            }
            other => panic!("expected InsufficientPeers, got {other:?}"),
        }
    }

    #[test]
    fn pr_6_5_openraft_with_one_peer_is_rejected() {
        // 1-peer "cluster" has no quorum semantics — almost certainly a
        // misconfiguration where the operator forgot to add the others.
        let cfg = cfg_for(
            RaftClusterMode::OpenRaft,
            HandlerWritePath::RaftSubmitter,
            vec!["https://10.0.0.1:7100".into()],
        );
        assert!(matches!(
            cfg.validate(),
            Err(AssemblyError::InsufficientPeers { have: 1, need: 2 })
        ));
    }

    #[test]
    fn pr_6_5_openraft_with_two_peers_passes() {
        // Two-voter cluster (e.g. .140 + .141 in the homelab) is
        // intentionally permitted as the minimum viable cluster.
        let cfg = cfg_for(
            RaftClusterMode::OpenRaft,
            HandlerWritePath::RaftSubmitter,
            vec![
                "https://10.0.0.1:7100".into(),
                "https://10.0.0.2:7100".into(),
            ],
        );
        cfg.validate()
            .expect("two-peer openraft cluster must validate");
    }

    #[test]
    fn pr_6_5_disabled_mode_does_not_demand_peers() {
        // Single-node mode is plaintext-OK, peers-OK, write-path-OK in
        // any combination. The gate only fires when openraft is on.
        let cfg = cfg_for(
            RaftClusterMode::Disabled,
            HandlerWritePath::LocalSqlite,
            vec![],
        );
        cfg.validate()
            .expect("single-node mode must validate without peers");
    }

    #[test]
    fn pr_6_5_disabled_mode_with_raft_submitter_is_currently_permitted() {
        // Operator declared cluster.write_path = "raft_submitter" but
        // mode = "disabled" — this is a no-op declaration in single-node
        // mode (RaftSubmitter has no Raft to submit through). PR 6.5
        // doesn't reject it because nothing's broken on disk; future
        // PRs may surface a warning log if the combination becomes
        // ambiguous in practice.
        let cfg = cfg_for(
            RaftClusterMode::Disabled,
            HandlerWritePath::RaftSubmitter,
            vec![],
        );
        cfg.validate()
            .expect("Disabled+RaftSubmitter is permitted (no-op declaration)");
    }

    #[tokio::test]
    async fn pr_6_5_build_raft_cluster_runs_validate_first() {
        // The load-bearing wiring assertion: build_raft_cluster MUST
        // run validate() before reading TLS files. Otherwise a misconfigured
        // write_path could surface as a confusing PEM read error instead
        // of the actionable WritePathMismatch.
        let local = Arc::new(crate::commit::LocalSqliteCommitter::open_in_memory().unwrap())
            as Arc<dyn MutationCommitter>;
        let log_storage = SqliteRaftLogStorage::open_in_memory();
        let cfg = cfg_for(
            RaftClusterMode::OpenRaft,
            HandlerWritePath::LocalSqlite, // mismatched
            three_peer_set(),
        );
        match build_raft_cluster(cfg, log_storage, local).await {
            Err(AssemblyError::WritePathMismatch { .. }) => {}
            Err(other) => panic!("expected WritePathMismatch, got {other:?}"),
            Ok(_) => panic!("expected WritePathMismatch, assembly succeeded"),
        }
    }

    #[test]
    fn handler_write_path_default_is_local_sqlite() {
        // Backwards-compat: any existing config that doesn't specify
        // write_path keeps its single-node behavior. Operators must
        // explicitly opt INTO RaftSubmitter when enabling openraft.
        assert_eq!(HandlerWritePath::default(), HandlerWritePath::LocalSqlite);
    }

    #[test]
    fn production_defaults_pair_openraft_with_raft_submitter() {
        // production_defaults() pairs OpenRaft mode with RaftSubmitter
        // write path so the template is internally consistent.
        // Operator still needs to fill cluster_tls + peers before
        // validation passes.
        let d = RaftAssemblyConfig::production_defaults(
            YantrikNodeId::new(1),
            "https://10.0.0.1:7100".into(),
        );
        assert_eq!(d.write_path, HandlerWritePath::RaftSubmitter);
        assert!(d.peers.is_empty(), "operator MUST supply peers");
    }
}