Skip to main content

rivven_cluster/
raft.rs

1//! Raft integration for cluster metadata consensus
2//!
3//! This module provides the Raft consensus implementation for cluster metadata.
4//! It uses **redb** (pure Rust) for persistent log storage and wraps our `ClusterMetadata`
5//! state machine with full openraft integration.
6//!
7//! # Architecture
8//!
9//! - **TypeConfig**: Defines all Raft-related types (node ID, entry, etc.)
10//! - **LogStore**: redb-backed log storage implementing `RaftLogStorage`
11//! - **StateMachine**: Wraps `ClusterMetadata` implementing `RaftStateMachine`
12//! - **NetworkFactory**: Creates HTTP-based network connections for Raft RPCs
13//! - **RaftNode**: High-level API managing the Raft instance
14//!
15//! # Why redb?
16//!
17//! We use redb instead of RocksDB for several benefits:
18//! - **Pure Rust**: Zero C/C++ dependencies, compiles everywhere
19//! - **Fast builds**: ~10s vs 2-5 minutes for RocksDB
20//! - **Cross-compile**: Works with musl, WASM, etc.
21//! - **ACID**: Full transactional guarantees
22//! - **Small binary**: Minimal size impact
23
24// Suppress warnings for large error types from openraft crate
25#![allow(clippy::result_large_err)]
26
27use crate::config::ClusterConfig;
28use crate::error::{ClusterError, Result};
29use crate::metadata::{ClusterMetadata, MetadataCommand, MetadataResponse};
30use crate::storage::RedbLogStore;
31use openraft::network::{RPCOption, RaftNetwork, RaftNetworkFactory};
32use openraft::raft::responder::OneshotResponder;
33use openraft::storage::{RaftStateMachine, Snapshot};
34use openraft::{
35    BasicNode, Entry, EntryPayload, LogId, Membership, RaftTypeConfig, SnapshotMeta, StorageError,
36    StorageIOError, StoredMembership, Vote,
37};
38use serde::{Deserialize, Serialize};
39use std::collections::BTreeMap;
40use std::fmt::Debug;
41use std::path::PathBuf;
42use std::sync::Arc;
43use tokio::io::{AsyncReadExt, AsyncSeekExt};
44use tokio::sync::RwLock;
45use tracing::{debug, info, warn};
46
47/// Type alias for backward compatibility - uses redb storage
48pub type LogStore = RedbLogStore;
49
50// ============================================================================
51// Error Types
52// ============================================================================
53
54/// Simple error wrapper for network errors that implements std::error::Error
55#[derive(Debug)]
56struct NetworkErrorWrapper(String);
57
58impl std::fmt::Display for NetworkErrorWrapper {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0)
61    }
62}
63
64impl std::error::Error for NetworkErrorWrapper {}
65
66// ============================================================================
67// Type Configuration
68// ============================================================================
69
70/// Raft node ID type
71pub type NodeId = u64;
72
73/// Application request type for the state machine
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75pub struct RaftRequest {
76    pub command: MetadataCommand,
77}
78
79/// Application response type from the state machine
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
81pub struct RaftResponse {
82    pub response: MetadataResponse,
83}
84
85/// Type configuration for Rivven's Raft implementation
86#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
87#[cfg_attr(feature = "raft", derive(Serialize, Deserialize))]
88pub struct TypeConfig;
89
90impl RaftTypeConfig for TypeConfig {
91    type D = RaftRequest;
92    type R = RaftResponse;
93    type NodeId = NodeId;
94    type Node = BasicNode;
95    type Entry = Entry<TypeConfig>;
96    /// File-backed snapshot I/O (§2.5 — best-in-class).
97    ///
98    /// Snapshots are persisted to `{data_dir}/snapshots/{snapshot_id}.snap` using
99    /// `tokio::fs::File`. openraft reads/writes snapshot data in chunks via
100    /// `AsyncRead`/`AsyncWrite`/`AsyncSeek` — the snapshot itself NEVER appears
101    /// in any serialized RPC (`InstallSnapshotRequest` transports raw `Vec<u8>`
102    /// chunks).
103    ///
104    /// Benefits over `Cursor<Vec<u8>>`:
105    /// - **Persistent**: Snapshots survive process restarts — avoids replaying
106    ///   the entire Raft log on startup.
107    /// - **Streaming**: Large snapshots are read/written in chunks rather than
108    ///   materializing the entire blob in memory at once.
109    /// - **Disk-backed**: Memory pressure is bounded by the chunk size, not the
110    ///   total snapshot size.
111    type SnapshotData = tokio::fs::File;
112    type AsyncRuntime = openraft::TokioRuntime;
113    type Responder = OneshotResponder<TypeConfig>;
114}
115
116/// Maximum allowed snapshot size (§2.5 guard). If exceeded, snapshot creation
117/// returns an error instead of silently allocating unbounded memory.
118const MAX_SNAPSHOT_SIZE: usize = 64 * 1024 * 1024; // 64 MB
119
120// Type aliases for convenience
121pub type RaftLogId = LogId<NodeId>;
122pub type RaftVote = Vote<NodeId>;
123pub type RaftEntry = Entry<TypeConfig>;
124pub type RaftMembership = Membership<NodeId, BasicNode>;
125pub type RaftStoredMembership = StoredMembership<NodeId, BasicNode>;
126pub type RaftSnapshot = Snapshot<TypeConfig>;
127pub type RaftSnapshotMeta = SnapshotMeta<NodeId, BasicNode>;
128
129// ============================================================================
130// State Machine Implementation
131// ============================================================================
132
133/// State machine wrapping ClusterMetadata
134pub struct StateMachine {
135    /// The actual metadata state
136    metadata: RwLock<ClusterMetadata>,
137    /// Last applied log ID
138    last_applied: RwLock<Option<RaftLogId>>,
139    /// Current membership
140    membership: RwLock<RaftStoredMembership>,
141    /// Directory for persisted snapshot files
142    snapshot_dir: PathBuf,
143    /// ID of the most recent snapshot (for `get_current_snapshot`)
144    current_snapshot_id: RwLock<Option<String>>,
145    /// Metadata of the most recent snapshot
146    current_snapshot_meta: RwLock<Option<RaftSnapshotMeta>>,
147}
148
149impl StateMachine {
150    /// Create new state machine with a snapshot directory
151    pub fn new(snapshot_dir: impl Into<PathBuf>) -> Self {
152        let dir = snapshot_dir.into();
153        if let Err(e) = std::fs::create_dir_all(&dir) {
154            warn!(
155                path = %dir.display(),
156                error = %e,
157                "Failed to create snapshot directory; snapshots may not persist"
158            );
159        }
160        Self {
161            metadata: RwLock::new(ClusterMetadata::new()),
162            last_applied: RwLock::new(None),
163            membership: RwLock::new(StoredMembership::new(None, Membership::new(vec![], ()))),
164            snapshot_dir: dir,
165            current_snapshot_id: RwLock::new(None),
166            current_snapshot_meta: RwLock::new(None),
167        }
168    }
169
170    /// Create state machine for legacy callers (in-memory, temp dir for snapshots)
171    pub fn new_default() -> Self {
172        Self::new(std::env::temp_dir().join("rivven-snapshots"))
173    }
174
175    /// Get current metadata (read-only)
176    pub async fn metadata(&self) -> tokio::sync::RwLockReadGuard<'_, ClusterMetadata> {
177        self.metadata.read().await
178    }
179
180    /// Apply a command to the state machine
181    async fn apply_command(&self, log_id: &RaftLogId, command: MetadataCommand) -> RaftResponse {
182        let mut metadata = self.metadata.write().await;
183        let response = metadata.apply(log_id.index, command);
184        *self.last_applied.write().await = Some(*log_id);
185        RaftResponse { response }
186    }
187
188    /// Create a snapshot
189    ///
190    /// Serializes the state machine to disk for persistence and returns the
191    /// snapshot metadata + file path. The snapshot file is written atomically
192    /// (write-to-temp then rename) to prevent corruption.
193    ///
194    /// All three fields are read under coordinated locking to produce
195    /// a consistent snapshot. We acquire locks in a fixed order
196    /// (metadata → last_applied → membership) and hold them together
197    /// so that no interleaving `apply` can mutate state between reads.
198    async fn create_snapshot(
199        &self,
200    ) -> std::result::Result<(RaftSnapshotMeta, PathBuf), StorageError<NodeId>> {
201        // Acquire all read locks together to get a consistent view.
202        let metadata_guard = self.metadata.read().await;
203        let last_applied_guard = self.last_applied.read().await;
204        let membership_guard = self.membership.read().await;
205
206        let metadata = metadata_guard.clone();
207        let last_applied = *last_applied_guard;
208        let membership = membership_guard.clone();
209
210        // Release all locks before serialization (which can be expensive)
211        drop(membership_guard);
212        drop(last_applied_guard);
213        drop(metadata_guard);
214
215        let snapshot_data = SnapshotData {
216            metadata: metadata.clone(),
217            last_applied,
218            membership: membership.clone(),
219        };
220
221        let data = postcard::to_allocvec(&snapshot_data).map_err(|e| StorageError::IO {
222            source: StorageIOError::read_state_machine(openraft::AnyError::new(&e)),
223        })?;
224
225        // §2.5 guard: Reject snapshots that exceed the size limit
226        if data.len() > MAX_SNAPSHOT_SIZE {
227            return Err(StorageError::IO {
228                source: StorageIOError::read_state_machine(openraft::AnyError::new(
229                    &std::io::Error::other(format!(
230                        "Snapshot too large: {} bytes > {} byte limit",
231                        data.len(),
232                        MAX_SNAPSHOT_SIZE
233                    )),
234                )),
235            });
236        }
237
238        let snapshot_id = format!("snapshot-{}", metadata.last_applied_index);
239
240        let meta = SnapshotMeta {
241            last_log_id: snapshot_data.last_applied,
242            last_membership: membership,
243            snapshot_id: snapshot_id.clone(),
244        };
245
246        // Write snapshot to disk atomically: temp file → rename.
247        // The file format is: [data...][crc32:4 bytes LE]
248        // The trailing CRC-32 protects against bit-rot and truncation.
249        let snap_path = self.snapshot_dir.join(format!("{}.snap", snapshot_id));
250        let tmp_path = self.snapshot_dir.join(format!("{}.snap.tmp", snapshot_id));
251
252        let crc = crc32fast::hash(&data);
253        let mut file_data = data.clone();
254        file_data.extend_from_slice(&crc.to_le_bytes());
255
256        tokio::fs::write(&tmp_path, &file_data)
257            .await
258            .map_err(|e| StorageError::IO {
259                source: StorageIOError::write_snapshot(Some(meta.signature()), &e),
260            })?;
261
262        tokio::fs::rename(&tmp_path, &snap_path)
263            .await
264            .map_err(|e| StorageError::IO {
265                source: StorageIOError::write_snapshot(Some(meta.signature()), &e),
266            })?;
267
268        // Update current snapshot tracking
269        *self.current_snapshot_id.write().await = Some(snapshot_id.clone());
270        *self.current_snapshot_meta.write().await = Some(meta.clone());
271
272        // Clean up old snapshot files (keep latest 3)
273        self.cleanup_old_snapshots(3).await;
274
275        info!(
276            snapshot_id = %meta.snapshot_id,
277            last_log_id = ?meta.last_log_id,
278            size_bytes = data.len(),
279            path = %snap_path.display(),
280            "Created file-backed snapshot"
281        );
282
283        Ok((meta, snap_path))
284    }
285
286    /// Install a snapshot from deserialized data
287    ///
288    /// All three fields are updated while holding all write locks
289    /// simultaneously (lock ordering: metadata → last_applied → membership)
290    /// so that concurrent readers never observe partially-installed state.
291    ///
292    /// The data is expected in the snapshot file format: `[payload][crc32:4 LE]`.
293    /// If the data doesn't contain a valid CRC trailer (e.g. legacy snapshots
294    /// shorter than 5 bytes), it is deserialized directly without verification.
295    async fn install_snapshot_data(
296        &self,
297        data: &[u8],
298    ) -> std::result::Result<(), StorageError<NodeId>> {
299        // Verify CRC-32 integrity if the file is large enough to contain one.
300        let payload = if data.len() > 4 {
301            let (payload, crc_bytes) = data.split_at(data.len() - 4);
302            let stored_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
303            let actual_crc = crc32fast::hash(payload);
304            if stored_crc != actual_crc {
305                return Err(StorageError::IO {
306                    source: StorageIOError::read_state_machine(openraft::AnyError::new(
307                        &std::io::Error::other(format!(
308                            "Snapshot CRC mismatch: stored={:#010x} actual={:#010x} — file is corrupt",
309                            stored_crc, actual_crc
310                        )),
311                    )),
312                });
313            }
314            payload
315        } else {
316            data
317        };
318
319        let snapshot_data: SnapshotData =
320            postcard::from_bytes(payload).map_err(|e| StorageError::IO {
321                source: StorageIOError::read_state_machine(openraft::AnyError::new(&e)),
322            })?;
323
324        // Acquire all write locks before mutating any field to ensure
325        // atomicity. Lock ordering matches create_snapshot / get_snapshot_builder.
326        let mut metadata_guard = self.metadata.write().await;
327        let mut last_applied_guard = self.last_applied.write().await;
328        let mut membership_guard = self.membership.write().await;
329
330        *metadata_guard = snapshot_data.metadata;
331        *last_applied_guard = snapshot_data.last_applied;
332        *membership_guard = snapshot_data.membership;
333
334        // Guards are dropped here, releasing all locks together.
335        drop(membership_guard);
336        drop(last_applied_guard);
337        drop(metadata_guard);
338
339        info!("Installed snapshot from data");
340        Ok(())
341    }
342
343    /// Load the latest snapshot from disk (called during startup).
344    ///
345    /// This avoids replaying the entire Raft log by restoring the most recent
346    /// snapshot into the state machine.
347    pub async fn load_latest_snapshot(&self) -> std::result::Result<bool, StorageError<NodeId>> {
348        let latest = self.find_latest_snapshot_file().await;
349        let Some(path) = latest else {
350            debug!("No snapshot files found in {}", self.snapshot_dir.display());
351            return Ok(false);
352        };
353
354        let data = tokio::fs::read(&path).await.map_err(|e| StorageError::IO {
355            source: StorageIOError::read_state_machine(openraft::AnyError::new(&e)),
356        })?;
357
358        self.install_snapshot_data(&data).await?;
359
360        // Extract snapshot ID from filename
361        let snapshot_id = path
362            .file_stem()
363            .and_then(|s| s.to_str())
364            .unwrap_or("unknown")
365            .to_string();
366
367        info!(
368            snapshot_id = %snapshot_id,
369            size_bytes = data.len(),
370            path = %path.display(),
371            "Restored state machine from snapshot file"
372        );
373
374        *self.current_snapshot_id.write().await = Some(snapshot_id);
375        Ok(true)
376    }
377
378    /// Find the latest snapshot file by parsing the index from filenames.
379    async fn find_latest_snapshot_file(&self) -> Option<PathBuf> {
380        let mut entries = match tokio::fs::read_dir(&self.snapshot_dir).await {
381            Ok(e) => e,
382            Err(_) => return None,
383        };
384
385        let mut best: Option<(u64, PathBuf)> = None;
386
387        while let Ok(Some(entry)) = entries.next_entry().await {
388            let path = entry.path();
389            if path.extension().and_then(|e| e.to_str()) != Some("snap") {
390                continue;
391            }
392            // Parse "snapshot-{index}.snap" → index
393            let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
394            if let Some(idx_str) = stem.strip_prefix("snapshot-") {
395                if let Ok(idx) = idx_str.parse::<u64>() {
396                    if best.as_ref().is_none_or(|(best_idx, _)| idx > *best_idx) {
397                        best = Some((idx, path));
398                    }
399                }
400            }
401        }
402
403        best.map(|(_, p)| p)
404    }
405
406    /// Remove old snapshot files, keeping the `keep` most recent.
407    async fn cleanup_old_snapshots(&self, keep: usize) {
408        let mut entries = match tokio::fs::read_dir(&self.snapshot_dir).await {
409            Ok(e) => e,
410            Err(_) => return,
411        };
412
413        let mut snaps: Vec<(u64, PathBuf)> = Vec::new();
414        while let Ok(Some(entry)) = entries.next_entry().await {
415            let path = entry.path();
416            if path.extension().and_then(|e| e.to_str()) != Some("snap") {
417                continue;
418            }
419            let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
420            if let Some(idx_str) = stem.strip_prefix("snapshot-") {
421                if let Ok(idx) = idx_str.parse::<u64>() {
422                    snaps.push((idx, path));
423                }
424            }
425        }
426
427        if snaps.len() <= keep {
428            return;
429        }
430
431        snaps.sort_by_key(|(idx, _)| *idx);
432        let to_remove = snaps.len() - keep;
433        for (_, path) in snaps.into_iter().take(to_remove) {
434            if let Err(e) = tokio::fs::remove_file(&path).await {
435                warn!(path = %path.display(), error = %e, "Failed to remove old snapshot");
436            } else {
437                debug!(path = %path.display(), "Removed old snapshot file");
438            }
439        }
440    }
441}
442
443impl Default for StateMachine {
444    fn default() -> Self {
445        Self::new_default()
446    }
447}
448
449/// Snapshot data format
450#[derive(Debug, Clone, Serialize, Deserialize)]
451struct SnapshotData {
452    metadata: ClusterMetadata,
453    last_applied: Option<RaftLogId>,
454    membership: RaftStoredMembership,
455}
456
457// Implement RaftStateMachine for StateMachine
458impl RaftStateMachine<TypeConfig> for StateMachine {
459    type SnapshotBuilder = Self;
460
461    async fn applied_state(
462        &mut self,
463    ) -> std::result::Result<(Option<RaftLogId>, RaftStoredMembership), StorageError<NodeId>> {
464        let last_applied = *self.last_applied.read().await;
465        let membership = self.membership.read().await.clone();
466        Ok((last_applied, membership))
467    }
468
469    async fn apply<I>(
470        &mut self,
471        entries: I,
472    ) -> std::result::Result<Vec<RaftResponse>, StorageError<NodeId>>
473    where
474        I: IntoIterator<Item = RaftEntry> + Send,
475        I::IntoIter: Send,
476    {
477        let mut responses = Vec::new();
478
479        for entry in entries {
480            let log_id = entry.log_id;
481
482            match entry.payload {
483                EntryPayload::Blank => {
484                    // Blank entry - just update last_applied
485                    *self.last_applied.write().await = Some(log_id);
486                    responses.push(RaftResponse {
487                        response: MetadataResponse::Success,
488                    });
489                }
490                EntryPayload::Normal(req) => {
491                    // Normal command
492                    let response = self.apply_command(&log_id, req.command).await;
493                    responses.push(response);
494                }
495                EntryPayload::Membership(membership) => {
496                    // Membership change
497                    *self.membership.write().await =
498                        StoredMembership::new(Some(log_id), membership);
499                    *self.last_applied.write().await = Some(log_id);
500                    responses.push(RaftResponse {
501                        response: MetadataResponse::Success,
502                    });
503                }
504            }
505        }
506
507        Ok(responses)
508    }
509
510    async fn begin_receiving_snapshot(
511        &mut self,
512    ) -> std::result::Result<Box<tokio::fs::File>, StorageError<NodeId>> {
513        // Create a temporary file for receiving snapshot chunks.
514        // openraft will write chunks to this file via AsyncWrite.
515        // Use PID + random suffix to avoid collisions between concurrent
516        // snapshot transfers or multiple processes sharing the same directory.
517        let unique_id = format!(
518            "incoming-{}-{}.snap.tmp",
519            std::process::id(),
520            uuid::Uuid::new_v4().as_simple()
521        );
522        let tmp_path = self.snapshot_dir.join(&unique_id);
523        let file = tokio::fs::OpenOptions::new()
524            .create(true)
525            .write(true)
526            .read(true)
527            .truncate(true)
528            .open(&tmp_path)
529            .await
530            .map_err(|e| StorageError::IO {
531                source: StorageIOError::write_snapshot(None, &e),
532            })?;
533
534        debug!(path = %tmp_path.display(), "Created temp file for incoming snapshot");
535        Ok(Box::new(file))
536    }
537
538    async fn install_snapshot(
539        &mut self,
540        meta: &RaftSnapshotMeta,
541        mut snapshot: Box<tokio::fs::File>,
542    ) -> std::result::Result<(), StorageError<NodeId>> {
543        // Read all data from the file (openraft filled it via AsyncWrite)
544        snapshot
545            .seek(std::io::SeekFrom::Start(0))
546            .await
547            .map_err(|e| StorageError::IO {
548                source: StorageIOError::read_snapshot(Some(meta.signature()), &e),
549            })?;
550
551        let mut data = Vec::new();
552        snapshot
553            .read_to_end(&mut data)
554            .await
555            .map_err(|e| StorageError::IO {
556                source: StorageIOError::read_snapshot(Some(meta.signature()), &e),
557            })?;
558
559        // Guard against oversized snapshots from a malicious/buggy leader
560        if data.len() > MAX_SNAPSHOT_SIZE {
561            return Err(StorageError::IO {
562                source: StorageIOError::read_snapshot(
563                    Some(meta.signature()),
564                    &std::io::Error::new(
565                        std::io::ErrorKind::InvalidData,
566                        format!(
567                            "incoming snapshot {} bytes exceeds maximum {} bytes",
568                            data.len(),
569                            MAX_SNAPSHOT_SIZE
570                        ),
571                    ),
572                ),
573            });
574        }
575
576        // Install the snapshot data into the state machine
577        self.install_snapshot_data(&data).await?;
578
579        // Update membership from snapshot meta
580        *self.membership.write().await = meta.last_membership.clone();
581
582        // Persist: move temp file to permanent location.
583        // Clean up any incoming temp files matching our PID pattern.
584        let snap_path = self.snapshot_dir.join(format!("{}.snap", meta.snapshot_id));
585        let pid_prefix = format!("incoming-{}-", std::process::id());
586        if let Ok(mut entries) = tokio::fs::read_dir(&self.snapshot_dir).await {
587            while let Ok(Some(entry)) = entries.next_entry().await {
588                let name = entry.file_name();
589                let name_str = name.to_string_lossy();
590                if name_str.starts_with(&pid_prefix) && name_str.ends_with(".snap.tmp") {
591                    let tmp_path = entry.path();
592                    // Rename the first matching temp file; remove any extras
593                    if !snap_path.exists() {
594                        let _ = tokio::fs::rename(&tmp_path, &snap_path).await.map_err(|e| {
595                            StorageError::IO {
596                                source: StorageIOError::write_snapshot(Some(meta.signature()), &e),
597                            }
598                        });
599                    } else {
600                        let _ = tokio::fs::remove_file(&tmp_path).await;
601                    }
602                }
603            }
604        }
605
606        // Update tracking
607        *self.current_snapshot_id.write().await = Some(meta.snapshot_id.clone());
608        *self.current_snapshot_meta.write().await = Some(meta.clone());
609
610        self.cleanup_old_snapshots(3).await;
611
612        info!(
613            snapshot_id = %meta.snapshot_id,
614            size_bytes = data.len(),
615            "Installed snapshot from leader and persisted to disk"
616        );
617        Ok(())
618    }
619
620    async fn get_current_snapshot(
621        &mut self,
622    ) -> std::result::Result<Option<RaftSnapshot>, StorageError<NodeId>> {
623        // Reuse existing snapshot if it is still on disk
624        if let Some(id) = self.current_snapshot_id.read().await.as_deref() {
625            let existing = self.snapshot_dir.join(format!("{}.snap", id));
626            if existing.exists() {
627                if let Some(meta) = self.current_snapshot_meta.read().await.clone() {
628                    let sig = meta.signature();
629                    let file =
630                        tokio::fs::File::open(&existing)
631                            .await
632                            .map_err(|e| StorageError::IO {
633                                source: StorageIOError::read_snapshot(Some(sig), &e),
634                            })?;
635                    return Ok(Some(Snapshot {
636                        meta,
637                        snapshot: Box::new(file),
638                    }));
639                }
640            }
641        }
642
643        // No usable snapshot on disk — create a fresh one
644        let (meta, snap_path) = self.create_snapshot().await?;
645        let sig = meta.signature();
646        let file = tokio::fs::File::open(&snap_path)
647            .await
648            .map_err(|e| StorageError::IO {
649                source: StorageIOError::read_snapshot(Some(sig), &e),
650            })?;
651        Ok(Some(Snapshot {
652            meta,
653            snapshot: Box::new(file),
654        }))
655    }
656
657    async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
658        // Acquire all read locks before cloning to ensure a consistent
659        // point-in-time snapshot. Individual sequential awaits would allow
660        // concurrent apply() calls to modify state between lock acquisitions,
661        // producing an inconsistent snapshot builder.
662        let metadata = self.metadata.read().await;
663        let last_applied = self.last_applied.read().await;
664        let membership = self.membership.read().await;
665        let snap_id = self.current_snapshot_id.read().await;
666        let snap_meta = self.current_snapshot_meta.read().await;
667
668        Self {
669            metadata: RwLock::new(metadata.clone()),
670            last_applied: RwLock::new(*last_applied),
671            membership: RwLock::new(membership.clone()),
672            snapshot_dir: self.snapshot_dir.clone(),
673            current_snapshot_id: RwLock::new(snap_id.clone()),
674            current_snapshot_meta: RwLock::new(snap_meta.clone()),
675        }
676    }
677}
678
679// Implement RaftSnapshotBuilder for StateMachine
680impl openraft::storage::RaftSnapshotBuilder<TypeConfig> for StateMachine {
681    async fn build_snapshot(&mut self) -> std::result::Result<RaftSnapshot, StorageError<NodeId>> {
682        let (meta, snap_path) = self.create_snapshot().await?;
683        let sig = meta.signature();
684        let file = tokio::fs::File::open(&snap_path)
685            .await
686            .map_err(|e| StorageError::IO {
687                source: StorageIOError::read_snapshot(Some(sig), &e),
688            })?;
689        Ok(Snapshot {
690            meta,
691            snapshot: Box::new(file),
692        })
693    }
694}
695
696// ============================================================================
697// Network Implementation (High-Performance Binary Protocol)
698// ============================================================================
699
700/// Serialization format for Raft RPCs
701/// Binary (postcard) is ~5-10x faster than JSON
702#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
703pub enum SerializationFormat {
704    /// JSON format (for debugging/compatibility)
705    Json,
706    /// Binary format (for performance)
707    #[default]
708    Binary,
709}
710
711/// Compression configuration for Raft RPCs
712#[derive(Debug, Clone)]
713pub struct RaftCompressionConfig {
714    /// Enable compression
715    pub enabled: bool,
716    /// Minimum payload size to compress (bytes)
717    pub min_size: usize,
718    /// Use adaptive algorithm selection
719    pub adaptive: bool,
720}
721
722impl Default for RaftCompressionConfig {
723    fn default() -> Self {
724        Self {
725            enabled: true,
726            min_size: 1024, // Only compress payloads > 1KB
727            adaptive: true,
728        }
729    }
730}
731
732/// HTTP header used for Raft RPC shared-secret authentication.
733///
734/// When `cluster_secret` is configured, every outgoing Raft
735/// RPC includes this header and every incoming RPC must present a matching
736/// value. Constant-time comparison is used to prevent timing attacks.
737pub const CLUSTER_SECRET_HEADER: &str = "X-Rivven-Cluster-Secret";
738
739/// Network factory for creating Raft network connections
740#[derive(Clone)]
741pub struct NetworkFactory {
742    /// Node addresses
743    nodes: Arc<RwLock<BTreeMap<NodeId, String>>>,
744    /// HTTP client with connection pooling
745    client: reqwest::Client,
746    /// Serialization format
747    format: SerializationFormat,
748    /// Compression configuration
749    compression: RaftCompressionConfig,
750    /// Shared secret for Raft RPC authentication
751    cluster_secret: Option<String>,
752}
753
754impl NetworkFactory {
755    /// Create new network factory with binary serialization (fastest)
756    pub fn new() -> Result<Self> {
757        Self::with_format(SerializationFormat::Binary)
758    }
759
760    /// Create network factory with specific serialization format
761    pub fn with_format(format: SerializationFormat) -> Result<Self> {
762        Ok(Self {
763            nodes: Arc::new(RwLock::new(BTreeMap::new())),
764            client: reqwest::Client::builder()
765                .timeout(std::time::Duration::from_secs(5))
766                .pool_max_idle_per_host(10) // Connection pooling
767                .pool_idle_timeout(std::time::Duration::from_secs(60))
768                .tcp_keepalive(std::time::Duration::from_secs(30))
769                .tcp_nodelay(true) // Low latency
770                .build()
771                .map_err(|e| {
772                    ClusterError::Network(format!("Failed to create HTTP client: {}", e))
773                })?,
774            format,
775            compression: RaftCompressionConfig::default(),
776            cluster_secret: None,
777        })
778    }
779
780    /// Create network factory with compression config
781    pub fn with_compression(
782        format: SerializationFormat,
783        compression: RaftCompressionConfig,
784    ) -> Result<Self> {
785        Ok(Self {
786            compression,
787            ..Self::with_format(format)?
788        })
789    }
790
791    /// Set cluster secret for Raft RPC authentication.
792    pub fn with_cluster_secret(mut self, secret: Option<String>) -> Self {
793        self.cluster_secret = secret;
794        self
795    }
796
797    /// Register a node address
798    pub async fn add_node(&self, node_id: NodeId, addr: String) {
799        self.nodes.write().await.insert(node_id, addr);
800    }
801
802    /// Remove a node
803    pub async fn remove_node(&self, node_id: NodeId) {
804        self.nodes.write().await.remove(&node_id);
805    }
806}
807
808/// Network implementation for a single target node
809pub struct Network {
810    /// Target node identifier (stored for debugging and logging purposes)
811    #[allow(dead_code)]
812    target: NodeId,
813    target_addr: String,
814    client: reqwest::Client,
815    format: SerializationFormat,
816    compression: RaftCompressionConfig,
817    /// Shared secret for Raft RPC authentication
818    cluster_secret: Option<String>,
819}
820
821impl Network {
822    pub fn new(
823        target: NodeId,
824        target_addr: String,
825        client: reqwest::Client,
826        format: SerializationFormat,
827        compression: RaftCompressionConfig,
828        cluster_secret: Option<String>,
829    ) -> Self {
830        Self {
831            target,
832            target_addr,
833            client,
834            format,
835            compression,
836            cluster_secret,
837        }
838    }
839
840    /// Apply authentication header to request if cluster_secret is set.
841    fn apply_auth(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
842        if let Some(ref secret) = self.cluster_secret {
843            request.header(CLUSTER_SECRET_HEADER, secret)
844        } else {
845            request
846        }
847    }
848
849    /// Serialize request based on format
850    fn serialize<T: Serialize>(&self, data: &T) -> std::result::Result<Vec<u8>, String> {
851        match self.format {
852            SerializationFormat::Json => serde_json::to_vec(data).map_err(|e| e.to_string()),
853            SerializationFormat::Binary => postcard::to_allocvec(data).map_err(|e| e.to_string()),
854        }
855    }
856
857    /// Deserialize response based on format
858    fn deserialize<T: serde::de::DeserializeOwned>(
859        &self,
860        data: &[u8],
861    ) -> std::result::Result<T, String> {
862        match self.format {
863            SerializationFormat::Json => serde_json::from_slice(data).map_err(|e| e.to_string()),
864            SerializationFormat::Binary => postcard::from_bytes(data).map_err(|e| e.to_string()),
865        }
866    }
867
868    /// Get content type header
869    fn content_type(&self) -> &'static str {
870        match self.format {
871            SerializationFormat::Json => "application/json",
872            SerializationFormat::Binary => "application/octet-stream",
873        }
874    }
875
876    /// Compress data if enabled and beneficial
877    #[cfg(feature = "compression")]
878    fn maybe_compress(&self, data: Vec<u8>) -> (Vec<u8>, bool) {
879        use rivven_core::compression::{CompressionConfig, Compressor};
880
881        if !self.compression.enabled || data.len() < self.compression.min_size {
882            return (data, false);
883        }
884
885        let config = CompressionConfig {
886            min_size: self.compression.min_size,
887            adaptive: self.compression.adaptive,
888            ..Default::default()
889        };
890        let compressor = Compressor::with_config(config);
891
892        match compressor.compress(&data) {
893            Ok(compressed) => {
894                // Only use compression if it actually helps
895                if compressed.len() < data.len() {
896                    (compressed.to_vec(), true)
897                } else {
898                    (data, false)
899                }
900            }
901            Err(_) => (data, false),
902        }
903    }
904
905    #[cfg(not(feature = "compression"))]
906    fn maybe_compress(&self, data: Vec<u8>) -> (Vec<u8>, bool) {
907        (data, false)
908    }
909
910    /// Decompress data if it was compressed
911    #[cfg(feature = "compression")]
912    fn maybe_decompress(
913        &self,
914        data: &[u8],
915        was_compressed: bool,
916    ) -> std::result::Result<Vec<u8>, String> {
917        use rivven_core::compression::Compressor;
918
919        if !was_compressed {
920            return Ok(data.to_vec());
921        }
922
923        let compressor = Compressor::new();
924        compressor
925            .decompress(data)
926            .map(|b| b.to_vec())
927            .map_err(|e| e.to_string())
928    }
929
930    #[cfg(not(feature = "compression"))]
931    fn maybe_decompress(
932        &self,
933        data: &[u8],
934        _was_compressed: bool,
935    ) -> std::result::Result<Vec<u8>, String> {
936        Ok(data.to_vec())
937    }
938}
939
940// Implement RaftNetworkFactory
941impl RaftNetworkFactory<TypeConfig> for NetworkFactory {
942    type Network = Network;
943
944    async fn new_client(&mut self, target: NodeId, node: &BasicNode) -> Self::Network {
945        Network::new(
946            target,
947            node.addr.clone(),
948            self.client.clone(),
949            self.format,
950            self.compression.clone(),
951            self.cluster_secret.clone(),
952        )
953    }
954}
955
956// Implement RaftNetwork with optimized binary serialization and compression
957impl RaftNetwork<TypeConfig> for Network {
958    async fn append_entries(
959        &mut self,
960        rpc: openraft::raft::AppendEntriesRequest<TypeConfig>,
961        _option: RPCOption,
962    ) -> std::result::Result<
963        openraft::raft::AppendEntriesResponse<NodeId>,
964        openraft::error::RPCError<NodeId, BasicNode, openraft::error::RaftError<NodeId>>,
965    > {
966        use crate::observability::{NetworkMetrics, RaftMetrics};
967        let start = std::time::Instant::now();
968
969        let url = format!("{}/raft/append", self.target_addr);
970        let serialized = self.serialize(&rpc).map_err(|e| {
971            openraft::error::RPCError::Network(openraft::error::NetworkError::new(
972                &NetworkErrorWrapper(e),
973            ))
974        })?;
975
976        // Apply compression if beneficial
977        let (body, compressed) = self.maybe_compress(serialized);
978        let uncompressed_size = body.len();
979
980        NetworkMetrics::add_bytes_sent(body.len() as u64);
981        RaftMetrics::increment_append_entries_sent();
982
983        // Add compression header if compressed
984        let mut request = self.client.post(&url).body(body);
985        request = request.header("Content-Type", self.content_type());
986        request = self.apply_auth(request);
987        if compressed {
988            request = request.header("X-Rivven-Compressed", "1");
989            request = request.header("X-Rivven-Original-Size", uncompressed_size.to_string());
990        }
991
992        let resp = request.send().await.map_err(|e| {
993            NetworkMetrics::increment_rpc_errors("append_entries");
994            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
995        })?;
996
997        if !resp.status().is_success() {
998            NetworkMetrics::increment_rpc_errors("append_entries");
999            return Err(openraft::error::RPCError::Network(
1000                openraft::error::NetworkError::new(&NetworkErrorWrapper(format!(
1001                    "HTTP error: {}",
1002                    resp.status()
1003                ))),
1004            ));
1005        }
1006
1007        // Check if response is compressed
1008        let resp_compressed = resp
1009            .headers()
1010            .get("X-Rivven-Compressed")
1011            .map(|v| v == "1")
1012            .unwrap_or(false);
1013
1014        let bytes = resp.bytes().await.map_err(|e| {
1015            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
1016        })?;
1017
1018        NetworkMetrics::add_bytes_received(bytes.len() as u64);
1019        RaftMetrics::record_append_entries_latency(start.elapsed());
1020
1021        // Decompress if needed
1022        let response_data = self
1023            .maybe_decompress(&bytes, resp_compressed)
1024            .map_err(|e| {
1025                openraft::error::RPCError::Network(openraft::error::NetworkError::new(
1026                    &NetworkErrorWrapper(e),
1027                ))
1028            })?;
1029
1030        let response: openraft::raft::AppendEntriesResponse<NodeId> =
1031            self.deserialize(&response_data).map_err(|e| {
1032                openraft::error::RPCError::Network(openraft::error::NetworkError::new(
1033                    &NetworkErrorWrapper(e),
1034                ))
1035            })?;
1036
1037        Ok(response)
1038    }
1039
1040    async fn install_snapshot(
1041        &mut self,
1042        rpc: openraft::raft::InstallSnapshotRequest<TypeConfig>,
1043        _option: RPCOption,
1044    ) -> std::result::Result<
1045        openraft::raft::InstallSnapshotResponse<NodeId>,
1046        openraft::error::RPCError<
1047            NodeId,
1048            BasicNode,
1049            openraft::error::RaftError<NodeId, openraft::error::InstallSnapshotError>,
1050        >,
1051    > {
1052        use crate::observability::{NetworkMetrics, RaftMetrics};
1053        let start = std::time::Instant::now();
1054
1055        let url = format!("{}/raft/snapshot", self.target_addr);
1056        let serialized = self.serialize(&rpc).map_err(|e| {
1057            openraft::error::RPCError::Network(openraft::error::NetworkError::new(
1058                &NetworkErrorWrapper(e),
1059            ))
1060        })?;
1061
1062        // Snapshots benefit significantly from compression
1063        let (body, compressed) = self.maybe_compress(serialized);
1064        let uncompressed_size = body.len();
1065
1066        NetworkMetrics::add_bytes_sent(body.len() as u64);
1067
1068        // Add compression header if compressed
1069        let mut request = self.client.post(&url).body(body);
1070        request = request.header("Content-Type", self.content_type());
1071        request = self.apply_auth(request);
1072        if compressed {
1073            request = request.header("X-Rivven-Compressed", "1");
1074            request = request.header("X-Rivven-Original-Size", uncompressed_size.to_string());
1075        }
1076
1077        let resp = request.send().await.map_err(|e| {
1078            NetworkMetrics::increment_rpc_errors("install_snapshot");
1079            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
1080        })?;
1081
1082        if !resp.status().is_success() {
1083            NetworkMetrics::increment_rpc_errors("install_snapshot");
1084            return Err(openraft::error::RPCError::Network(
1085                openraft::error::NetworkError::new(&NetworkErrorWrapper(format!(
1086                    "HTTP error: {}",
1087                    resp.status()
1088                ))),
1089            ));
1090        }
1091
1092        let bytes = resp.bytes().await.map_err(|e| {
1093            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
1094        })?;
1095
1096        NetworkMetrics::add_bytes_received(bytes.len() as u64);
1097        RaftMetrics::record_snapshot_duration(start.elapsed());
1098
1099        let response: openraft::raft::InstallSnapshotResponse<NodeId> =
1100            self.deserialize(&bytes).map_err(|e| {
1101                openraft::error::RPCError::Network(openraft::error::NetworkError::new(
1102                    &NetworkErrorWrapper(e),
1103                ))
1104            })?;
1105
1106        Ok(response)
1107    }
1108
1109    async fn vote(
1110        &mut self,
1111        rpc: openraft::raft::VoteRequest<NodeId>,
1112        _option: RPCOption,
1113    ) -> std::result::Result<
1114        openraft::raft::VoteResponse<NodeId>,
1115        openraft::error::RPCError<NodeId, BasicNode, openraft::error::RaftError<NodeId>>,
1116    > {
1117        use crate::observability::{NetworkMetrics, RaftMetrics};
1118        let start = std::time::Instant::now();
1119
1120        let url = format!("{}/raft/vote", self.target_addr);
1121        let body = self.serialize(&rpc).map_err(|e| {
1122            openraft::error::RPCError::Network(openraft::error::NetworkError::new(
1123                &NetworkErrorWrapper(e),
1124            ))
1125        })?;
1126
1127        NetworkMetrics::add_bytes_sent(body.len() as u64);
1128
1129        let resp = self
1130            .client
1131            .post(&url)
1132            .body(body)
1133            .header("Content-Type", self.content_type());
1134        let resp = self.apply_auth(resp);
1135        let resp = resp.send().await.map_err(|e| {
1136            NetworkMetrics::increment_rpc_errors("vote");
1137            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
1138        })?;
1139
1140        if !resp.status().is_success() {
1141            NetworkMetrics::increment_rpc_errors("vote");
1142            return Err(openraft::error::RPCError::Network(
1143                openraft::error::NetworkError::new(&NetworkErrorWrapper(format!(
1144                    "HTTP error: {}",
1145                    resp.status()
1146                ))),
1147            ));
1148        }
1149
1150        let bytes = resp.bytes().await.map_err(|e| {
1151            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
1152        })?;
1153
1154        NetworkMetrics::add_bytes_received(bytes.len() as u64);
1155        RaftMetrics::record_vote_latency(start.elapsed());
1156        RaftMetrics::increment_elections();
1157
1158        let response: openraft::raft::VoteResponse<NodeId> =
1159            self.deserialize(&bytes).map_err(|e| {
1160                openraft::error::RPCError::Network(openraft::error::NetworkError::new(
1161                    &NetworkErrorWrapper(e),
1162                ))
1163            })?;
1164
1165        Ok(response)
1166    }
1167}
1168
1169// ============================================================================
1170// High-Level Raft Node API
1171// ============================================================================
1172
1173/// Configuration for Raft consensus
1174#[derive(Debug, Clone)]
1175pub struct RaftNodeConfig {
1176    /// Our node ID (will be hashed to u64)
1177    pub node_id: String,
1178    /// Whether in standalone mode
1179    pub standalone: bool,
1180    /// Data directory for Raft storage
1181    pub data_dir: std::path::PathBuf,
1182    /// Heartbeat interval in milliseconds
1183    pub heartbeat_interval_ms: u64,
1184    /// Election timeout range in milliseconds
1185    pub election_timeout_min_ms: u64,
1186    pub election_timeout_max_ms: u64,
1187    /// Snapshot threshold (log entries before snapshot)
1188    pub snapshot_threshold: u64,
1189    /// Initial cluster members (for bootstrapping)
1190    pub initial_members: Vec<(NodeId, BasicNode)>,
1191    /// Shared secret for Raft RPC authentication
1192    pub cluster_secret: Option<String>,
1193}
1194
1195// ============================================================================
1196// Batch Proposal Accumulator
1197// ============================================================================
1198
1199/// A pending batch of proposals waiting to be submitted
1200#[allow(dead_code)]
1201pub(crate) struct PendingBatch {
1202    /// Commands accumulated in this batch
1203    commands: Vec<MetadataCommand>,
1204    /// Response channels for each command
1205    responders: Vec<tokio::sync::oneshot::Sender<Result<MetadataResponse>>>,
1206    /// When this batch started accumulating
1207    started: std::time::Instant,
1208}
1209
1210/// Configuration for batch proposals
1211#[derive(Debug, Clone)]
1212pub struct BatchConfig {
1213    /// Maximum batch size before forcing a flush
1214    pub max_batch_size: usize,
1215    /// Maximum time to wait before flushing (microseconds)
1216    pub max_wait_us: u64,
1217    /// Enable batching (false = immediate proposals)
1218    pub enabled: bool,
1219}
1220
1221impl Default for BatchConfig {
1222    fn default() -> Self {
1223        Self {
1224            max_batch_size: 100,
1225            max_wait_us: 1000, // 1ms default linger
1226            enabled: true,
1227        }
1228    }
1229}
1230
1231/// Batch proposal accumulator for high-throughput writes
1232///
1233/// This batches multiple writes over a short time window and submits them
1234/// as a single Raft proposal, amortizing the consensus overhead.
1235///
1236/// Throughput improvement: 10-50x for small writes
1237/// Latency trade-off: adds up to `max_wait_us` latency
1238pub struct BatchAccumulator {
1239    /// Current pending batch (Mutex instead of RwLock — no read path)
1240    pending: tokio::sync::Mutex<Option<PendingBatch>>,
1241    /// Batch configuration
1242    config: BatchConfig,
1243    /// Notification channel for new items
1244    notify: tokio::sync::Notify,
1245}
1246
1247impl BatchAccumulator {
1248    /// Create a new batch accumulator
1249    pub fn new(config: BatchConfig) -> Self {
1250        Self {
1251            pending: tokio::sync::Mutex::new(None),
1252            config,
1253            notify: tokio::sync::Notify::new(),
1254        }
1255    }
1256
1257    /// Add a command to the current batch, returning a channel for the response
1258    pub async fn add(
1259        &self,
1260        command: MetadataCommand,
1261    ) -> tokio::sync::oneshot::Receiver<Result<MetadataResponse>> {
1262        let (tx, rx) = tokio::sync::oneshot::channel();
1263
1264        let should_flush = {
1265            let mut pending = self.pending.lock().await;
1266
1267            if pending.is_none() {
1268                *pending = Some(PendingBatch {
1269                    commands: vec![command],
1270                    responders: vec![tx],
1271                    started: std::time::Instant::now(),
1272                });
1273                false
1274            } else if let Some(batch) = pending.as_mut() {
1275                batch.commands.push(command);
1276                batch.responders.push(tx);
1277                batch.commands.len() >= self.config.max_batch_size
1278            } else {
1279                unreachable!("pending was checked to be Some")
1280            }
1281        };
1282
1283        self.notify.notify_one();
1284
1285        if should_flush {
1286            // Force flush if batch is full
1287            self.notify.notify_one();
1288        }
1289
1290        rx
1291    }
1292
1293    /// Take the current batch if ready (full or timed out)
1294    #[allow(dead_code)]
1295    pub(crate) async fn take_if_ready(&self) -> Option<PendingBatch> {
1296        let mut pending = self.pending.lock().await;
1297
1298        if let Some(ref batch) = *pending {
1299            let elapsed = batch.started.elapsed();
1300            let size = batch.commands.len();
1301
1302            if size >= self.config.max_batch_size
1303                || elapsed.as_micros() as u64 >= self.config.max_wait_us
1304            {
1305                return pending.take();
1306            }
1307        }
1308        None
1309    }
1310
1311    /// Wait for the batch to be ready
1312    pub async fn wait_ready(&self) {
1313        let timeout = std::time::Duration::from_micros(self.config.max_wait_us);
1314        let _ = tokio::time::timeout(timeout, self.notify.notified()).await;
1315    }
1316}
1317
1318impl Default for RaftNodeConfig {
1319    fn default() -> Self {
1320        Self {
1321            node_id: "node-1".to_string(),
1322            standalone: true,
1323            data_dir: std::path::PathBuf::from("./data/raft"),
1324            heartbeat_interval_ms: 150,
1325            election_timeout_min_ms: 300,
1326            election_timeout_max_ms: 600,
1327            snapshot_threshold: 10000,
1328            initial_members: vec![],
1329            cluster_secret: None,
1330        }
1331    }
1332}
1333
1334/// High-level Raft node wrapper
1335pub struct RaftNode {
1336    /// The openraft instance (only in cluster mode)
1337    raft: Option<openraft::Raft<TypeConfig>>,
1338    /// Log storage (for cluster mode reference, kept for state consistency)
1339    #[allow(dead_code)]
1340    log_store: Option<Arc<LogStore>>,
1341    /// State machine (for direct access in standalone mode)
1342    state_machine: StateMachine,
1343    /// Network factory (stores node addresses)
1344    network: NetworkFactory,
1345    /// Our node ID
1346    node_id: NodeId,
1347    /// String node ID (original)
1348    node_id_str: String,
1349    /// Whether we're in standalone mode
1350    standalone: bool,
1351    /// Next log index (for standalone mode)
1352    next_index: RwLock<u64>,
1353    /// Data directory
1354    data_dir: std::path::PathBuf,
1355    /// Raft config for start
1356    raft_config: RaftNodeConfig,
1357    /// Shared secret for Raft RPC authentication
1358    cluster_secret: Option<String>,
1359}
1360
1361impl RaftNode {
1362    /// Create a new Raft node from cluster config
1363    pub async fn new(config: &ClusterConfig) -> Result<Self> {
1364        let raft_config = RaftNodeConfig {
1365            node_id: config.node_id.clone(),
1366            standalone: config.mode == crate::config::ClusterMode::Standalone,
1367            data_dir: config.data_dir.join("raft"),
1368            heartbeat_interval_ms: config.raft.heartbeat_interval.as_millis() as u64,
1369            election_timeout_min_ms: config.raft.election_timeout_min.as_millis() as u64,
1370            election_timeout_max_ms: config.raft.election_timeout_max.as_millis() as u64,
1371            snapshot_threshold: config.raft.snapshot_threshold,
1372            initial_members: vec![],
1373            cluster_secret: config.raft.cluster_secret.clone(),
1374        };
1375        Self::with_config(raft_config).await
1376    }
1377
1378    /// Create a new Raft node with explicit configuration
1379    pub async fn with_config(config: RaftNodeConfig) -> Result<Self> {
1380        std::fs::create_dir_all(&config.data_dir)
1381            .map_err(|e| ClusterError::RaftStorage(e.to_string()))?;
1382
1383        let snapshot_dir = config.data_dir.join("snapshots");
1384        let state_machine = StateMachine::new(snapshot_dir);
1385        let network = NetworkFactory::new()
1386            .map_err(|e| {
1387                ClusterError::RaftStorage(format!("Failed to create network factory: {}", e))
1388            })?
1389            .with_cluster_secret(config.cluster_secret.clone());
1390        let node_id = hash_node_id(&config.node_id);
1391
1392        info!(
1393            node_id,
1394            node_id_str = %config.node_id,
1395            standalone = config.standalone,
1396            data_dir = %config.data_dir.display(),
1397            "Created Raft node"
1398        );
1399
1400        Ok(Self {
1401            raft: None,
1402            log_store: None,
1403            state_machine,
1404            network,
1405            node_id,
1406            node_id_str: config.node_id.clone(),
1407            standalone: config.standalone,
1408            next_index: RwLock::new(1),
1409            data_dir: config.data_dir.clone(),
1410            cluster_secret: config.cluster_secret.clone(),
1411            raft_config: config,
1412        })
1413    }
1414
1415    /// Initialize and start the Raft instance
1416    pub async fn start(&mut self) -> Result<()> {
1417        if self.standalone {
1418            info!(node_id = self.node_id, "Starting in standalone mode");
1419            return Ok(());
1420        }
1421
1422        // Initialize log storage
1423        let log_store = LogStore::new(&self.data_dir)
1424            .map_err(|e| ClusterError::RaftStorage(format!("Failed to create log store: {}", e)))?;
1425
1426        // Build openraft config
1427        let raft_config = openraft::Config {
1428            cluster_name: "rivven-cluster".to_string(),
1429            heartbeat_interval: self.raft_config.heartbeat_interval_ms,
1430            election_timeout_min: self.raft_config.election_timeout_min_ms,
1431            election_timeout_max: self.raft_config.election_timeout_max_ms,
1432            snapshot_policy: openraft::SnapshotPolicy::LogsSinceLast(
1433                self.raft_config.snapshot_threshold,
1434            ),
1435            max_in_snapshot_log_to_keep: 1000,
1436            ..Default::default()
1437        };
1438
1439        let raft_config = Arc::new(
1440            raft_config
1441                .validate()
1442                .map_err(|e| ClusterError::RaftStorage(format!("Invalid Raft config: {}", e)))?,
1443        );
1444
1445        // Create a new state machine for openraft (it takes ownership)
1446        let snapshot_dir = self.data_dir.join("snapshots");
1447        let state_machine = StateMachine::new(&snapshot_dir);
1448
1449        // §2.5: Load the latest snapshot from disk to avoid replaying the entire log
1450        match state_machine.load_latest_snapshot().await {
1451            Ok(true) => info!("Restored state machine from snapshot file"),
1452            Ok(false) => debug!("No existing snapshot found, starting fresh"),
1453            Err(e) => warn!(error = %e, "Failed to load snapshot, starting fresh"),
1454        }
1455
1456        // Create a new network factory for openraft (it takes ownership)
1457        let network = NetworkFactory::new()
1458            .map_err(|e| ClusterError::Network(format!("Failed to create network factory: {}", e)))?
1459            .with_cluster_secret(self.cluster_secret.clone());
1460        // Copy node addresses to the new network
1461        for (id, addr) in self.network.nodes.read().await.iter() {
1462            network.add_node(*id, addr.clone()).await;
1463        }
1464
1465        // Create the Raft instance
1466        let raft =
1467            openraft::Raft::new(self.node_id, raft_config, network, log_store, state_machine)
1468                .await
1469                .map_err(|e| ClusterError::RaftStorage(format!("Failed to create Raft: {}", e)))?;
1470
1471        self.raft = Some(raft);
1472
1473        info!(
1474            node_id = self.node_id,
1475            node_id_str = %self.node_id_str,
1476            "Cluster mode Raft initialized and ready"
1477        );
1478        Ok(())
1479    }
1480
1481    /// Initialize cluster with initial membership (bootstrap)
1482    /// This should only be called on the first node of a new cluster
1483    pub async fn bootstrap(&self, members: BTreeMap<NodeId, BasicNode>) -> Result<()> {
1484        if self.standalone {
1485            return Ok(());
1486        }
1487
1488        if let Some(ref raft) = self.raft {
1489            raft.initialize(members)
1490                .await
1491                .map_err(|e| ClusterError::RaftStorage(format!("Failed to bootstrap: {}", e)))?;
1492            info!(node_id = self.node_id, "Bootstrapped Raft cluster");
1493        }
1494        Ok(())
1495    }
1496
1497    /// Propose a command to the Raft cluster
1498    pub async fn propose(&self, command: MetadataCommand) -> Result<MetadataResponse> {
1499        use crate::observability::RaftMetrics;
1500        let start = std::time::Instant::now();
1501
1502        if self.standalone {
1503            // In standalone mode, apply directly to state machine
1504            let index = {
1505                let mut next = self.next_index.write().await;
1506                let idx = *next;
1507                *next += 1;
1508                idx
1509            };
1510            // Use term 0 for standalone entries.  Real Raft elections
1511            // start at term 1, so term-0 entries can never conflict with
1512            // cluster-mode log entries, enabling a future standalone-to-cluster
1513            // migration path.
1514            let log_id = LogId::new(openraft::CommittedLeaderId::new(0, self.node_id), index);
1515            let response = self.state_machine.apply_command(&log_id, command).await;
1516
1517            RaftMetrics::increment_proposals();
1518            RaftMetrics::increment_commits();
1519            RaftMetrics::record_proposal_latency(start.elapsed());
1520
1521            return Ok(response.response);
1522        }
1523
1524        // Cluster mode - use Raft client_write
1525        if let Some(ref raft) = self.raft {
1526            let request = RaftRequest { command };
1527            let result = raft
1528                .client_write(request)
1529                .await
1530                .map_err(|e| ClusterError::RaftStorage(format!("Client write failed: {}", e)))?;
1531
1532            RaftMetrics::increment_proposals();
1533            RaftMetrics::increment_commits();
1534            RaftMetrics::record_proposal_latency(start.elapsed());
1535
1536            return Ok(result.data.response);
1537        }
1538
1539        Err(ClusterError::RaftStorage(
1540            "Raft not initialized".to_string(),
1541        ))
1542    }
1543
1544    /// Propose multiple commands in a single batch for higher throughput
1545    ///
1546    /// This is more efficient than calling propose() multiple times because:
1547    /// 1. Single Raft consensus round for all commands
1548    /// 2. Single disk fsync for all log entries
1549    /// 3. Amortized network overhead
1550    ///
1551    /// Returns responses in the same order as commands.
1552    pub async fn propose_batch(
1553        &self,
1554        commands: Vec<MetadataCommand>,
1555    ) -> Result<Vec<MetadataResponse>> {
1556        use crate::observability::RaftMetrics;
1557
1558        if commands.is_empty() {
1559            return Ok(vec![]);
1560        }
1561
1562        let batch_size = commands.len();
1563        RaftMetrics::record_batch_size(batch_size);
1564
1565        if self.standalone {
1566            // In standalone mode, apply all directly
1567            let mut responses = Vec::with_capacity(commands.len());
1568            for command in commands {
1569                let index = {
1570                    let mut next = self.next_index.write().await;
1571                    let idx = *next;
1572                    *next += 1;
1573                    idx
1574                };
1575                // Use term 0 for standalone entries — see propose() for rationale.
1576                let log_id = LogId::new(openraft::CommittedLeaderId::new(0, self.node_id), index);
1577                let response = self.state_machine.apply_command(&log_id, command).await;
1578                responses.push(response.response);
1579            }
1580            return Ok(responses);
1581        }
1582
1583        // Cluster mode - submit as a single atomic Batch command
1584        // This goes through one Raft consensus round and one fsync
1585        if let Some(ref raft) = self.raft {
1586            let batch_command = MetadataCommand::Batch(commands);
1587            let request = RaftRequest {
1588                command: batch_command,
1589            };
1590
1591            let result = raft
1592                .client_write(request)
1593                .await
1594                .map_err(|e| ClusterError::RaftStorage(format!("Batch write failed: {}", e)))?;
1595
1596            RaftMetrics::increment_proposals();
1597            RaftMetrics::increment_commits();
1598
1599            // Extract per-command responses from the batch result
1600            match result.data.response {
1601                MetadataResponse::BatchResponses(responses) => return Ok(responses),
1602                // Fallback: if the state machine returned a non-batch response
1603                // (e.g., during rolling upgrade), replicate it for each command
1604                other => return Ok(vec![other; batch_size]),
1605            }
1606        }
1607
1608        Err(ClusterError::RaftStorage(
1609            "Raft not initialized".to_string(),
1610        ))
1611    }
1612
1613    /// Ensure linearizable read by confirming leadership with cluster
1614    ///
1615    /// This implements the ReadIndex optimization from the Raft paper.
1616    /// It allows any node to serve consistent reads by:
1617    /// 1. Leader records current commit index as read_index
1618    /// 2. Leader confirms it's still leader (heartbeat quorum)
1619    /// 3. Wait for applied index >= read_index
1620    /// 4. Return to client - data is linearizable
1621    ///
1622    /// This is 10-100x faster than read-via-propose since it doesn't write to log.
1623    pub async fn ensure_linearizable_read(&self) -> Result<()> {
1624        if self.standalone {
1625            // Standalone mode - always linearizable (single node)
1626            return Ok(());
1627        }
1628
1629        if let Some(ref raft) = self.raft {
1630            // Use openraft's built-in linearizable read mechanism
1631            // This waits for the state machine to catch up to the commit index
1632            let applied = raft.ensure_linearizable().await.map_err(|e| {
1633                ClusterError::RaftStorage(format!("Linearizable read failed: {}", e))
1634            })?;
1635
1636            debug!(
1637                applied_log = %applied.map(|l| l.index.to_string()).unwrap_or_else(|| "none".to_string()),
1638                "Linearizable read confirmed"
1639            );
1640            return Ok(());
1641        }
1642
1643        Err(ClusterError::RaftStorage(
1644            "Raft not initialized".to_string(),
1645        ))
1646    }
1647
1648    /// Read metadata with linearizable consistency
1649    ///
1650    /// This ensures the read reflects all committed writes up to this point.
1651    /// Slightly slower than eventual reads but guarantees consistency.
1652    pub async fn linearizable_metadata(
1653        &self,
1654    ) -> Result<tokio::sync::RwLockReadGuard<'_, ClusterMetadata>> {
1655        // First ensure we're up to date
1656        self.ensure_linearizable_read().await?;
1657        // Then return the metadata
1658        Ok(self.state_machine.metadata().await)
1659    }
1660
1661    /// Get current metadata
1662    pub async fn metadata(&self) -> tokio::sync::RwLockReadGuard<'_, ClusterMetadata> {
1663        self.state_machine.metadata().await
1664    }
1665
1666    /// Check if this node is the Raft leader
1667    pub fn is_leader(&self) -> bool {
1668        if self.standalone {
1669            return true;
1670        }
1671
1672        if let Some(ref raft) = self.raft {
1673            let metrics = raft.metrics().borrow().clone();
1674            return metrics.current_leader == Some(self.node_id);
1675        }
1676        false
1677    }
1678
1679    /// Get current leader node ID
1680    pub fn leader(&self) -> Option<NodeId> {
1681        if self.standalone {
1682            return Some(self.node_id);
1683        }
1684
1685        if let Some(ref raft) = self.raft {
1686            let metrics = raft.metrics().borrow().clone();
1687            return metrics.current_leader;
1688        }
1689        None
1690    }
1691
1692    /// Get our node ID
1693    pub fn node_id(&self) -> NodeId {
1694        self.node_id
1695    }
1696
1697    /// Get our string node ID
1698    pub fn node_id_str(&self) -> &str {
1699        &self.node_id_str
1700    }
1701
1702    /// Get the underlying Raft instance (for advanced operations)
1703    pub fn get_raft(&self) -> Option<&openraft::Raft<TypeConfig>> {
1704        self.raft.as_ref()
1705    }
1706
1707    /// Add a node to the network (for cluster mode)
1708    pub async fn add_peer(&self, node_id: NodeId, addr: String) {
1709        self.network.add_node(node_id, addr).await;
1710    }
1711
1712    /// Remove a node from the network
1713    pub async fn remove_peer(&self, node_id: NodeId) {
1714        self.network.remove_node(node_id).await;
1715    }
1716
1717    /// Snapshot the current state (in standalone mode)
1718    pub async fn snapshot(&self) -> Result<()> {
1719        // In cluster mode, snapshots are managed by openraft
1720        if !self.standalone {
1721            if let Some(ref raft) = self.raft {
1722                raft.trigger().snapshot().await.map_err(|e| {
1723                    ClusterError::RaftStorage(format!("Snapshot trigger failed: {}", e))
1724                })?;
1725                info!(node_id = self.node_id, "Triggered Raft snapshot");
1726                return Ok(());
1727            }
1728        }
1729
1730        // In standalone mode, create snapshot directly
1731        let (_meta, data) = self
1732            .state_machine
1733            .create_snapshot()
1734            .await
1735            .map_err(|e| ClusterError::RaftStorage(format!("{}", e)))?;
1736
1737        info!(path = %data.display(), "Created standalone snapshot");
1738        Ok(())
1739    }
1740
1741    /// Get Raft metrics (for monitoring)
1742    pub fn metrics(&self) -> Option<openraft::RaftMetrics<NodeId, BasicNode>> {
1743        self.raft.as_ref().map(|r| r.metrics().borrow().clone())
1744    }
1745
1746    // =========================================================================
1747    // Raft RPC Handlers (for HTTP endpoint integration)
1748    // =========================================================================
1749
1750    /// Handle AppendEntries RPC from another node
1751    pub async fn handle_append_entries(
1752        &self,
1753        req: openraft::raft::AppendEntriesRequest<TypeConfig>,
1754    ) -> std::result::Result<openraft::raft::AppendEntriesResponse<NodeId>, ClusterError> {
1755        if let Some(ref raft) = self.raft {
1756            raft.append_entries(req)
1757                .await
1758                .map_err(|e| ClusterError::RaftStorage(format!("{}", e)))
1759        } else {
1760            Err(ClusterError::RaftStorage(
1761                "Raft not initialized".to_string(),
1762            ))
1763        }
1764    }
1765
1766    /// Handle InstallSnapshot RPC from leader
1767    pub async fn handle_install_snapshot(
1768        &self,
1769        req: openraft::raft::InstallSnapshotRequest<TypeConfig>,
1770    ) -> std::result::Result<openraft::raft::InstallSnapshotResponse<NodeId>, ClusterError> {
1771        if let Some(ref raft) = self.raft {
1772            raft.install_snapshot(req)
1773                .await
1774                .map_err(|e| ClusterError::RaftStorage(format!("{}", e)))
1775        } else {
1776            Err(ClusterError::RaftStorage(
1777                "Raft not initialized".to_string(),
1778            ))
1779        }
1780    }
1781
1782    /// Handle Vote RPC during election
1783    pub async fn handle_vote(
1784        &self,
1785        req: openraft::raft::VoteRequest<NodeId>,
1786    ) -> std::result::Result<openraft::raft::VoteResponse<NodeId>, ClusterError> {
1787        if let Some(ref raft) = self.raft {
1788            raft.vote(req)
1789                .await
1790                .map_err(|e| ClusterError::RaftStorage(format!("{}", e)))
1791        } else {
1792            Err(ClusterError::RaftStorage(
1793                "Raft not initialized".to_string(),
1794            ))
1795        }
1796    }
1797
1798    /// Verify that an incoming Raft RPC carries the correct cluster secret.
1799    ///
1800    /// HTTP handlers should call this before dispatching to
1801    /// `handle_append_entries` / `handle_install_snapshot` / `handle_vote`.
1802    /// Returns `Ok(())` when:
1803    ///   - No cluster_secret is configured (open cluster, backward compat), or
1804    ///   - The provided header value matches the configured secret.
1805    ///
1806    /// Returns `Err(Unauthorized)` when a secret is configured and the header
1807    /// is missing or does not match. Uses constant-time comparison to prevent
1808    /// timing side-channels.
1809    pub fn verify_cluster_secret(
1810        &self,
1811        header_value: Option<&str>,
1812    ) -> std::result::Result<(), ClusterError> {
1813        let Some(ref expected) = self.cluster_secret else {
1814            return Ok(()); // No secret configured — open cluster
1815        };
1816
1817        let Some(provided) = header_value else {
1818            return Err(ClusterError::Unauthorized(
1819                "Missing X-Rivven-Cluster-Secret header".to_string(),
1820            ));
1821        };
1822
1823        // Constant-time comparison to prevent timing attacks.
1824        // Pad shorter secret to the longer length so the length is never
1825        // leaked via an early-return timing side-channel.
1826        let expected_bytes = expected.as_bytes();
1827        let provided_bytes = provided.as_bytes();
1828        let max_len = expected_bytes.len().max(provided_bytes.len());
1829        // Length mismatch contributes to `diff` but does NOT short-circuit.
1830        let mut diff = (expected_bytes.len() != provided_bytes.len()) as u8;
1831        for i in 0..max_len {
1832            let a = expected_bytes.get(i).copied().unwrap_or(0);
1833            let b = provided_bytes.get(i).copied().unwrap_or(0);
1834            diff |= a ^ b;
1835        }
1836        if diff != 0 {
1837            return Err(ClusterError::Unauthorized(
1838                "Invalid cluster secret".to_string(),
1839            ));
1840        }
1841
1842        Ok(())
1843    }
1844
1845    /// Get the cluster secret (for HTTP server setup).
1846    pub fn cluster_secret(&self) -> Option<&str> {
1847        self.cluster_secret.as_deref()
1848    }
1849}
1850
1851// ============================================================================
1852// Utility Functions
1853// ============================================================================
1854
1855/// Hash a string node ID to u64 for Raft compatibility
1856///
1857/// Uses deterministic FNV-1a hash instead of `DefaultHasher`.
1858/// `DefaultHasher` (SipHash) is not guaranteed to be stable across Rust
1859/// versions or platforms, which could cause node ID mismatches in a cluster.
1860pub fn hash_node_id(node_id: &str) -> NodeId {
1861    // FNV-1a: deterministic, platform-independent, and fast for short strings
1862    let mut hash: u64 = 0xcbf29ce484222325; // FNV-1a offset basis
1863    for byte in node_id.as_bytes() {
1864        hash ^= *byte as u64;
1865        hash = hash.wrapping_mul(0x100000001b3); // FNV-1a prime
1866    }
1867    hash
1868}
1869
1870// ============================================================================
1871// Legacy Compatibility
1872// ============================================================================
1873
1874/// Legacy type alias for backward compatibility
1875pub type RaftNodeId = NodeId;
1876
1877/// Legacy type alias for RaftController  
1878pub type RaftController = RaftNode;
1879
1880/// Re-export for lib.rs
1881pub use openraft::storage::RaftLogStorage as RaftLogStorageTrait;
1882
1883#[cfg(test)]
1884mod tests {
1885    use super::*;
1886    use openraft::storage::RaftLogStorage;
1887    use tempfile::TempDir;
1888
1889    #[tokio::test]
1890    async fn test_log_storage_creation() {
1891        let temp_dir = TempDir::new().unwrap();
1892        let path = temp_dir.path().join("raft.redb");
1893        let mut storage = LogStore::new(&path).unwrap();
1894
1895        // Verify storage is functional
1896        let state = storage.get_log_state().await.unwrap();
1897        assert!(state.last_log_id.is_none());
1898    }
1899
1900    #[tokio::test]
1901    async fn test_state_machine_apply() {
1902        let temp_dir = TempDir::new().unwrap();
1903        let sm = StateMachine::new(temp_dir.path().join("snapshots"));
1904        let log_id = LogId::new(openraft::CommittedLeaderId::new(1, 1), 1);
1905
1906        let cmd = MetadataCommand::CreateTopic {
1907            config: crate::partition::TopicConfig::new("test-topic", 3, 1),
1908            partition_assignments: vec![
1909                vec!["node-1".into()],
1910                vec!["node-1".into()],
1911                vec!["node-1".into()],
1912            ],
1913        };
1914
1915        let response = sm.apply_command(&log_id, cmd).await;
1916        assert!(matches!(
1917            response.response,
1918            MetadataResponse::TopicCreated { .. }
1919        ));
1920
1921        // Verify topic exists
1922        let metadata = sm.metadata().await;
1923        assert!(metadata.topics.contains_key("test-topic"));
1924    }
1925
1926    #[tokio::test]
1927    async fn test_raft_node_standalone() {
1928        let temp_dir = TempDir::new().unwrap();
1929        let config = ClusterConfig {
1930            data_dir: temp_dir.path().to_path_buf(),
1931            ..ClusterConfig::standalone()
1932        };
1933
1934        let mut node = RaftNode::new(&config).await.unwrap();
1935        node.start().await.unwrap();
1936
1937        assert!(node.is_leader());
1938
1939        // Propose a command
1940        let response = node.propose(MetadataCommand::Noop).await.unwrap();
1941        assert!(matches!(response, MetadataResponse::Success));
1942    }
1943
1944    #[test]
1945    fn test_hash_node_id() {
1946        let id1 = hash_node_id("node-1");
1947        let id2 = hash_node_id("node-2");
1948        let id1_again = hash_node_id("node-1");
1949
1950        assert_ne!(id1, id2);
1951        assert_eq!(id1, id1_again);
1952    }
1953}