rivven-core 0.0.21

Core library for Rivven distributed event streaming platform
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
//! Encryption at rest for Rivven data
//!
//! This module provides transparent encryption for WAL files, segments, and other
//! persistent data. It uses AES-256-GCM for authenticated encryption with per-record
//! nonces derived from LSN to ensure unique encryption contexts.
//!
//! # Features
//!
//! - **AES-256-GCM** authenticated encryption with 12-byte nonces
//! - **Key derivation** using HKDF-SHA256 from master key
//! - **Key rotation** support with key versioning
//! - **Envelope encryption** for data keys encrypted by master key
//! - **Zero-copy decryption** where possible
//!
//! # Security Properties
//!
//! - Confidentiality: AES-256 encryption
//! - Integrity: GCM authentication tag (16 bytes)
//! - Replay protection: LSN-derived nonces prevent nonce reuse
//!
//! # Example
//!
//! ```rust,ignore
//! use rivven_core::encryption::{EncryptionConfig, KeyProvider, EncryptionManager};
//!
//! // Create encryption config
//! let config = EncryptionConfig::new()
//!     .with_algorithm(Algorithm::Aes256Gcm)
//!     .with_key_provider(KeyProvider::File("/path/to/master.key".into()));
//!
//! // Initialize encryption manager
//! let manager = EncryptionManager::new(config).await?;
//!
//! // Encrypt data
//! let ciphertext = manager.encrypt(b"sensitive data", 12345)?;
//!
//! // Decrypt data  
//! let plaintext = manager.decrypt(&ciphertext, 12345)?;
//! ```

use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, AES_256_GCM, CHACHA20_POLY1305};
use ring::hkdf::{Salt, HKDF_SHA256};
use ring::rand::{SecureRandom, SystemRandom};
use secrecy::{ExposeSecret, SecretBox};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::fs;
use std::io;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use thiserror::Error;

/// AES-256-GCM nonce size in bytes
pub use crate::crypto::NONCE_SIZE;

/// AES-256-GCM authentication tag size in bytes
pub use crate::crypto::TAG_SIZE;

/// AES-256 key size in bytes
pub use crate::crypto::KEY_SIZE;

/// Shared key material type
pub use crate::crypto::{KeyInfo, KeyMaterial};

/// Encryption header magic bytes
const ENCRYPTION_MAGIC: [u8; 4] = [0x52, 0x56, 0x45, 0x4E]; // "RVEN"

/// Current encryption format version
const FORMAT_VERSION: u8 = 1;

/// Encryption errors
#[derive(Debug, Error)]
pub enum EncryptionError {
    #[error("key provider error: {0}")]
    KeyProvider(String),

    #[error("encryption failed: {0}")]
    Encryption(String),

    #[error("decryption failed: {0}")]
    Decryption(String),

    #[error("invalid key: {0}")]
    InvalidKey(String),

    #[error("key rotation error: {0}")]
    KeyRotation(String),

    #[error("io error: {0}")]
    Io(#[from] io::Error),

    #[error("invalid format: {0}")]
    InvalidFormat(String),

    #[error("unsupported version: {0}")]
    UnsupportedVersion(u8),

    #[error("key not found: version {0}")]
    KeyNotFound(u32),
}

/// Result type for encryption operations
pub type Result<T> = std::result::Result<T, EncryptionError>;

/// Encryption algorithm
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum Algorithm {
    /// AES-256-GCM (recommended)
    #[default]
    Aes256Gcm,
    /// ChaCha20-Poly1305 (alternative for systems without AES-NI)
    ChaCha20Poly1305,
}

impl Algorithm {
    /// Get the key size for this algorithm
    pub fn key_size(&self) -> usize {
        match self {
            Algorithm::Aes256Gcm => 32,
            Algorithm::ChaCha20Poly1305 => 32,
        }
    }

    /// Get the nonce size for this algorithm
    pub fn nonce_size(&self) -> usize {
        match self {
            Algorithm::Aes256Gcm => 12,
            Algorithm::ChaCha20Poly1305 => 12,
        }
    }

    /// Get the tag size for this algorithm
    pub fn tag_size(&self) -> usize {
        match self {
            Algorithm::Aes256Gcm => 16,
            Algorithm::ChaCha20Poly1305 => 16,
        }
    }
}

/// Key provider for encryption keys
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum KeyProvider {
    /// Read key from a file (hex-encoded or raw 32 bytes)
    File { path: PathBuf },

    /// Key from environment variable (hex-encoded)
    Environment { variable: String },

    // Future: AWS KMS key (feature = "aws-kms")
    // Future: HashiCorp Vault (feature = "vault")
    /// In-memory key (for testing only)
    #[serde(skip)]
    InMemory(#[serde(skip)] Vec<u8>),
}

impl Default for KeyProvider {
    fn default() -> Self {
        KeyProvider::Environment {
            variable: "RIVVEN_ENCRYPTION_KEY".to_string(),
        }
    }
}

/// Encryption configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptionConfig {
    /// Whether encryption is enabled
    #[serde(default)]
    pub enabled: bool,

    /// Encryption algorithm
    #[serde(default)]
    pub algorithm: Algorithm,

    /// Key provider
    #[serde(default)]
    pub key_provider: KeyProvider,

    /// Key rotation interval in days (0 = no rotation)
    #[serde(default)]
    pub key_rotation_days: u32,

    /// Additional authenticated data scope
    #[serde(default = "default_aad_scope")]
    pub aad_scope: String,
}

fn default_aad_scope() -> String {
    "rivven".to_string()
}

impl Default for EncryptionConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            algorithm: Algorithm::default(),
            key_provider: KeyProvider::default(),
            key_rotation_days: 0,
            aad_scope: default_aad_scope(),
        }
    }
}

impl EncryptionConfig {
    /// Create a new encryption config with defaults
    pub fn new() -> Self {
        Self::default()
    }

    /// Enable encryption
    pub fn enabled(mut self) -> Self {
        self.enabled = true;
        self
    }

    /// Set the encryption algorithm
    pub fn with_algorithm(mut self, algorithm: Algorithm) -> Self {
        self.algorithm = algorithm;
        self
    }

    /// Set the key provider
    pub fn with_key_provider(mut self, provider: KeyProvider) -> Self {
        self.key_provider = provider;
        self
    }

    /// Set key rotation interval
    pub fn with_key_rotation_days(mut self, days: u32) -> Self {
        self.key_rotation_days = days;
        self
    }
}

/// Encrypted data header
///
/// Format:
/// - Magic (4 bytes): "RVEN"
/// - Version (1 byte)
/// - Algorithm (1 byte)
/// - Key version (4 bytes)
/// - Nonce (12 bytes)
/// - Reserved (2 bytes)
/// - Ciphertext + Tag (variable)
#[derive(Debug, Clone)]
pub struct EncryptedHeader {
    pub version: u8,
    pub algorithm: Algorithm,
    pub key_version: u32,
    pub nonce: [u8; NONCE_SIZE],
}

impl EncryptedHeader {
    /// Header size in bytes
    pub const SIZE: usize = 24;

    /// Create a new header
    pub fn new(algorithm: Algorithm, key_version: u32, nonce: [u8; NONCE_SIZE]) -> Self {
        Self {
            version: FORMAT_VERSION,
            algorithm,
            key_version,
            nonce,
        }
    }

    /// Serialize header to bytes
    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
        let mut buf = [0u8; Self::SIZE];
        buf[0..4].copy_from_slice(&ENCRYPTION_MAGIC);
        buf[4] = self.version;
        buf[5] = self.algorithm as u8;
        buf[6..10].copy_from_slice(&self.key_version.to_be_bytes());
        buf[10..22].copy_from_slice(&self.nonce);
        // bytes 22-23 reserved
        buf
    }

    /// Parse header from bytes
    pub fn from_bytes(data: &[u8]) -> Result<Self> {
        if data.len() < Self::SIZE {
            return Err(EncryptionError::InvalidFormat(format!(
                "header too short: {} < {}",
                data.len(),
                Self::SIZE
            )));
        }

        // Check magic
        if data[0..4] != ENCRYPTION_MAGIC {
            return Err(EncryptionError::InvalidFormat("invalid magic bytes".into()));
        }

        let version = data[4];
        if version != FORMAT_VERSION {
            return Err(EncryptionError::UnsupportedVersion(version));
        }

        let algorithm = match data[5] {
            0 => Algorithm::Aes256Gcm,
            1 => Algorithm::ChaCha20Poly1305,
            v => {
                return Err(EncryptionError::InvalidFormat(format!(
                    "unknown algorithm: {}",
                    v
                )))
            }
        };

        let key_version = u32::from_be_bytes([data[6], data[7], data[8], data[9]]);
        let mut nonce = [0u8; NONCE_SIZE];
        nonce.copy_from_slice(&data[10..22]);

        Ok(Self {
            version,
            algorithm,
            key_version,
            nonce,
        })
    }
}

/// Master key wrapper with secure memory handling
pub struct MasterKey {
    key: SecretBox<[u8; KEY_SIZE]>,
    version: u32,
}

impl MasterKey {
    /// Create a new master key from raw bytes
    pub fn new(key: Vec<u8>, version: u32) -> Result<Self> {
        if key.len() != KEY_SIZE {
            return Err(EncryptionError::InvalidKey(format!(
                "key must be {} bytes, got {}",
                KEY_SIZE,
                key.len()
            )));
        }
        let mut key_array = [0u8; KEY_SIZE];
        key_array.copy_from_slice(&key);
        Ok(Self {
            key: SecretBox::new(Box::new(key_array)),
            version,
        })
    }

    /// Generate a new random master key
    pub fn generate(version: u32) -> Result<Self> {
        let rng = SystemRandom::new();
        let mut key = vec![0u8; KEY_SIZE];
        rng.fill(&mut key)
            .map_err(|_| EncryptionError::KeyProvider("failed to generate random key".into()))?;
        Self::new(key, version)
    }

    /// Load master key from provider
    pub fn from_provider(provider: &KeyProvider) -> Result<Self> {
        match provider {
            KeyProvider::File { path } => {
                let data = fs::read(path)?;
                let key = if data.len() == KEY_SIZE {
                    // Raw binary key
                    data
                } else {
                    // Try hex-encoded
                    let hex_str = String::from_utf8(data)
                        .map_err(|_| EncryptionError::InvalidKey("invalid key file format".into()))?
                        .trim()
                        .to_string();
                    hex::decode(&hex_str).map_err(|e| {
                        EncryptionError::InvalidKey(format!("invalid hex key: {}", e))
                    })?
                };
                Self::new(key, 1)
            }
            KeyProvider::Environment { variable } => {
                let hex_key = std::env::var(variable).map_err(|_| {
                    EncryptionError::KeyProvider(format!(
                        "environment variable '{}' not set",
                        variable
                    ))
                })?;
                let key = hex::decode(hex_key.trim()).map_err(|e| {
                    EncryptionError::InvalidKey(format!("invalid hex key in env var: {}", e))
                })?;
                Self::new(key, 1)
            }
            KeyProvider::InMemory(key) => Self::new(key.clone(), 1),
            #[allow(unreachable_patterns)]
            _ => Err(EncryptionError::KeyProvider(
                "unsupported key provider".into(),
            )),
        }
    }

    /// Get key version
    pub fn version(&self) -> u32 {
        self.version
    }

    /// Derive a data encryption key using HKDF
    fn derive_data_key(&self, info: &[u8]) -> Result<[u8; KEY_SIZE]> {
        let salt = Salt::new(HKDF_SHA256, b"rivven-encryption-v1");
        let prk = salt.extract(self.key.expose_secret());
        let info_refs = [info];
        let okm = prk
            .expand(&info_refs, DataKeyLen)
            .map_err(|_| EncryptionError::Encryption("key derivation failed".into()))?;

        let mut data_key = [0u8; KEY_SIZE];
        okm.fill(&mut data_key)
            .map_err(|_| EncryptionError::Encryption("key expansion failed".into()))?;
        Ok(data_key)
    }
}

impl fmt::Debug for MasterKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("MasterKey")
            .field("version", &self.version)
            .field("key", &"[REDACTED]")
            .finish()
    }
}

/// HKDF output length marker
struct DataKeyLen;

impl ring::hkdf::KeyType for DataKeyLen {
    fn len(&self) -> usize {
        KEY_SIZE
    }
}

/// Encryption manager for transparent encryption at rest
pub struct EncryptionManager {
    config: EncryptionConfig,
    master_key: MasterKey,
    /// All known keys indexed by version (for encrypting + decrypting)
    ///
    /// `parking_lot::RwLock` is intentional — critical sections are O(1)
    /// HashMap lookups/inserts and never held across `.await` points.
    key_store: parking_lot::RwLock<HashMap<u32, LessSafeKey>>,
    rng: SystemRandom,
    current_key_version: AtomicU32,
}

/// Select the ring AEAD algorithm based on our Algorithm enum
fn ring_algorithm(algo: Algorithm) -> &'static ring::aead::Algorithm {
    match algo {
        Algorithm::Aes256Gcm => &AES_256_GCM,
        Algorithm::ChaCha20Poly1305 => &CHACHA20_POLY1305,
    }
}

impl EncryptionManager {
    /// Create a new encryption manager
    pub fn new(config: EncryptionConfig) -> Result<Arc<Self>> {
        let master_key = MasterKey::from_provider(&config.key_provider)?;
        let data_key_bytes = master_key.derive_data_key(config.aad_scope.as_bytes())?;

        let algo = ring_algorithm(config.algorithm);

        // Populate the key store with the initial key version
        let version = master_key.version();
        let store_unbound = UnboundKey::new(algo, &data_key_bytes)
            .map_err(|_| EncryptionError::InvalidKey("failed to create store key".into()))?;
        let mut key_store = HashMap::new();
        key_store.insert(version, LessSafeKey::new(store_unbound));

        Ok(Arc::new(Self {
            config,
            current_key_version: AtomicU32::new(version),
            master_key,
            key_store: parking_lot::RwLock::new(key_store),
            rng: SystemRandom::new(),
        }))
    }

    /// Rotate the data encryption key, deriving a new key from a new master key.
    /// Old keys are retained in the key store so existing data can still be decrypted.
    ///
    /// The key is inserted into the store BEFORE the version is bumped,
    /// ensuring that any concurrent decrypt using the new version can always find
    /// the key. We use Acquire for the read and Release for the write to establish
    /// a happens-before relationship.
    pub fn rotate_key(&self, new_master: MasterKey) -> Result<()> {
        let new_version = new_master.version();
        if new_version <= self.current_key_version.load(Ordering::Acquire) {
            return Err(EncryptionError::KeyRotation(
                "new key version must be greater than current".into(),
            ));
        }

        let data_key_bytes = new_master.derive_data_key(self.config.aad_scope.as_bytes())?;
        let algo = ring_algorithm(self.config.algorithm);

        let new_key = UnboundKey::new(algo, &data_key_bytes)
            .map_err(|_| EncryptionError::KeyRotation("failed to create new key".into()))?;

        // Add to key store
        {
            let mut store = self.key_store.write();
            store.insert(new_version, LessSafeKey::new(new_key));
        }

        // Atomically bump the current version — new encryptions will use this version
        self.current_key_version
            .store(new_version, Ordering::Release);

        Ok(())
    }

    /// Get a key by version for decryption
    fn get_key_for_version(&self, version: u32) -> Result<()> {
        let store = self.key_store.read();
        if store.contains_key(&version) {
            Ok(())
        } else {
            Err(EncryptionError::KeyNotFound(version))
        }
    }

    /// Create a no-op encryption manager for when encryption is disabled
    pub fn disabled() -> Arc<DisabledEncryption> {
        Arc::new(DisabledEncryption)
    }

    /// Check if encryption is enabled
    pub fn is_enabled(&self) -> bool {
        self.config.enabled
    }

    /// Get current key version
    pub fn key_version(&self) -> u32 {
        self.current_key_version.load(Ordering::Relaxed)
    }

    /// Generate a fully random 96-bit nonce for AES-256-GCM.
    ///
    /// Uses the full 96-bit nonce space (all random) instead of LSN+random.
    /// With random 96-bit nonces, the birthday bound allows ~$2^{48}$ encryptions
    /// per key before collision probability reaches $2^{-32}$ — far beyond any
    /// realistic WAL lifetime. The LSN parameter is retained for structured logging
    /// on RNG failure but does not constrain the nonce.
    ///
    /// propagate RNG failure instead of silently using zero bytes.
    fn generate_nonce(&self, lsn: u64) -> Result<[u8; NONCE_SIZE]> {
        let mut nonce = [0u8; NONCE_SIZE];
        self.rng.fill(&mut nonce).map_err(|_| {
            EncryptionError::Encryption(format!(
                "RNG failure during nonce generation for LSN {} — refusing to encrypt",
                lsn
            ))
        })?;
        Ok(nonce)
    }

    /// Encrypt data with associated LSN for nonce derivation
    ///
    /// uses key_store lookup by current_key_version instead of
    /// the stale `self.data_key` field, so post-rotation encryptions use the
    /// correct key.
    pub fn encrypt(&self, plaintext: &[u8], lsn: u64) -> Result<Vec<u8>> {
        let nonce_bytes = self.generate_nonce(lsn)?;
        let nonce = Nonce::assume_unique_for_key(nonce_bytes);

        let version = self.current_key_version.load(Ordering::Acquire);
        let header = EncryptedHeader::new(self.config.algorithm, version, nonce_bytes);

        // Allocate buffer: header + plaintext + tag
        let mut output = Vec::with_capacity(EncryptedHeader::SIZE + plaintext.len() + TAG_SIZE);
        output.extend_from_slice(&header.to_bytes());
        output.extend_from_slice(plaintext);

        // look up the current key from key_store (not self.data_key)
        let store = self.key_store.read();
        let key = store
            .get(&version)
            .ok_or(EncryptionError::KeyNotFound(version))?;

        // Encrypt in-place using separate tag method
        let ciphertext_start = EncryptedHeader::SIZE;
        let tag = key
            .seal_in_place_separate_tag(
                nonce,
                Aad::from(self.config.aad_scope.as_bytes()),
                &mut output[ciphertext_start..],
            )
            .map_err(|_| EncryptionError::Encryption("seal failed".into()))?;

        // Append the authentication tag
        output.extend_from_slice(tag.as_ref());

        Ok(output)
    }

    /// Decrypt data
    pub fn decrypt(&self, ciphertext: &[u8], _lsn: u64) -> Result<Vec<u8>> {
        if ciphertext.len() < EncryptedHeader::SIZE + TAG_SIZE {
            return Err(EncryptionError::InvalidFormat(
                "ciphertext too short".into(),
            ));
        }

        let header = EncryptedHeader::from_bytes(ciphertext)?;

        // Look up key by version from the key store (supports rotated keys)
        self.get_key_for_version(header.key_version)?;

        let nonce = Nonce::assume_unique_for_key(header.nonce);

        // Select the correct algorithm from the header (supports cross-algorithm decrypt)
        let algo = ring_algorithm(header.algorithm);

        // Copy ciphertext + tag for in-place decryption
        let mut buffer = ciphertext[EncryptedHeader::SIZE..].to_vec();

        // We need to use the key for this specific version
        let store = self.key_store.read();
        let key = store
            .get(&header.key_version)
            .ok_or(EncryptionError::KeyNotFound(header.key_version))?;

        // Verify the algorithm matches the key
        if *key.algorithm() != *algo {
            // Key was created with a different algorithm — re-derive
            drop(store);
            // For cross-algorithm decrypt: derive key bytes from master, create a temp key
            let data_key_bytes = self
                .master_key
                .derive_data_key(self.config.aad_scope.as_bytes())?;
            let unbound = UnboundKey::new(algo, &data_key_bytes)
                .map_err(|_| EncryptionError::Decryption("key re-derive failed".into()))?;
            let temp_key = LessSafeKey::new(unbound);
            let plaintext = temp_key
                .open_in_place(
                    nonce,
                    Aad::from(self.config.aad_scope.as_bytes()),
                    &mut buffer,
                )
                .map_err(|_| EncryptionError::Decryption("authentication failed".into()))?;
            return Ok(plaintext.to_vec());
        }

        let plaintext = key
            .open_in_place(
                nonce,
                Aad::from(self.config.aad_scope.as_bytes()),
                &mut buffer,
            )
            .map_err(|_| EncryptionError::Decryption("authentication failed".into()))?;

        Ok(plaintext.to_vec())
    }

    /// Calculate encrypted size for given plaintext size
    pub fn encrypted_size(&self, plaintext_len: usize) -> usize {
        EncryptedHeader::SIZE + plaintext_len + TAG_SIZE
    }
}

impl fmt::Debug for EncryptionManager {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("EncryptionManager")
            .field("enabled", &self.config.enabled)
            .field("algorithm", &self.config.algorithm)
            .field("key_version", &self.key_version())
            .finish()
    }
}

/// Placeholder for disabled encryption
#[derive(Debug)]
pub struct DisabledEncryption;

impl DisabledEncryption {
    /// Pass-through "encrypt" (just returns the data as-is)
    pub fn encrypt(&self, plaintext: &[u8], _lsn: u64) -> Result<Vec<u8>> {
        Ok(plaintext.to_vec())
    }

    /// Pass-through "decrypt" (just returns the data as-is)
    pub fn decrypt(&self, ciphertext: &[u8], _lsn: u64) -> Result<Vec<u8>> {
        Ok(ciphertext.to_vec())
    }

    /// No overhead when disabled
    pub fn encrypted_size(&self, plaintext_len: usize) -> usize {
        plaintext_len
    }

    pub fn is_enabled(&self) -> bool {
        false
    }
}

/// Trait for encryption operations (allows dynamic dispatch)
pub trait Encryptor: Send + Sync + std::fmt::Debug {
    fn encrypt(&self, plaintext: &[u8], lsn: u64) -> Result<Vec<u8>>;
    fn decrypt(&self, ciphertext: &[u8], lsn: u64) -> Result<Vec<u8>>;
    fn encrypted_size(&self, plaintext_len: usize) -> usize;
    fn is_enabled(&self) -> bool;
}

impl Encryptor for EncryptionManager {
    fn encrypt(&self, plaintext: &[u8], lsn: u64) -> Result<Vec<u8>> {
        self.encrypt(plaintext, lsn)
    }

    fn decrypt(&self, ciphertext: &[u8], lsn: u64) -> Result<Vec<u8>> {
        self.decrypt(ciphertext, lsn)
    }

    fn encrypted_size(&self, plaintext_len: usize) -> usize {
        self.encrypted_size(plaintext_len)
    }

    fn is_enabled(&self) -> bool {
        self.is_enabled()
    }
}

impl Encryptor for DisabledEncryption {
    fn encrypt(&self, plaintext: &[u8], lsn: u64) -> Result<Vec<u8>> {
        self.encrypt(plaintext, lsn)
    }

    fn decrypt(&self, ciphertext: &[u8], lsn: u64) -> Result<Vec<u8>> {
        self.decrypt(ciphertext, lsn)
    }

    fn encrypted_size(&self, plaintext_len: usize) -> usize {
        self.encrypted_size(plaintext_len)
    }

    fn is_enabled(&self) -> bool {
        false
    }
}

/// Generate a new encryption key and save to file
pub fn generate_key_file(path: &std::path::Path) -> Result<()> {
    let key = MasterKey::generate(1)?;
    let hex_key = hex::encode(key.key.expose_secret());
    fs::write(path, hex_key)?;

    // Set restrictive permissions on Unix
    #[cfg(unix)]
    {
        use std::os::unix::fs::PermissionsExt;
        let mut perms = fs::metadata(path)?.permissions();
        perms.set_mode(0o600);
        fs::set_permissions(path, perms)?;
    }

    // On Windows, restrict the key file as much as possible.
    // Windows does not have Unix-style file modes; set read-only to limit
    // exposure and warn the operator to verify ACLs manually.
    #[cfg(windows)]
    {
        let mut perms = fs::metadata(path)?.permissions();
        perms.set_readonly(true);
        fs::set_permissions(path, perms)?;
        tracing::warn!(
            path = %path.display(),
            "Key file created on Windows \u{2014} manually verify file ACLs restrict access to current user only"
        );
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    fn test_config() -> EncryptionConfig {
        let key = vec![0u8; 32]; // Test key
        EncryptionConfig {
            enabled: true,
            algorithm: Algorithm::Aes256Gcm,
            key_provider: KeyProvider::InMemory(key),
            key_rotation_days: 0,
            aad_scope: "test".to_string(),
        }
    }

    #[test]
    fn test_encrypt_decrypt() {
        let manager = EncryptionManager::new(test_config()).unwrap();
        let plaintext = b"Hello, World! This is sensitive data.";
        let lsn = 12345u64;

        let ciphertext = manager.encrypt(plaintext, lsn).unwrap();
        assert_ne!(ciphertext.as_slice(), plaintext);
        assert!(ciphertext.len() > plaintext.len());

        let decrypted = manager.decrypt(&ciphertext, lsn).unwrap();
        assert_eq!(decrypted, plaintext);
    }

    #[test]
    fn test_encrypted_size() {
        let manager = EncryptionManager::new(test_config()).unwrap();
        let plaintext_len = 1000;

        let expected = EncryptedHeader::SIZE + plaintext_len + TAG_SIZE;
        assert_eq!(manager.encrypted_size(plaintext_len), expected);
    }

    #[test]
    fn test_header_roundtrip() {
        let nonce = [1u8; NONCE_SIZE];
        let header = EncryptedHeader::new(Algorithm::Aes256Gcm, 42, nonce);

        let bytes = header.to_bytes();
        let parsed = EncryptedHeader::from_bytes(&bytes).unwrap();

        assert_eq!(parsed.version, header.version);
        assert_eq!(parsed.algorithm, header.algorithm);
        assert_eq!(parsed.key_version, header.key_version);
        assert_eq!(parsed.nonce, header.nonce);
    }

    #[test]
    fn test_invalid_ciphertext() {
        let manager = EncryptionManager::new(test_config()).unwrap();

        // Too short
        let result = manager.decrypt(&[0u8; 10], 1);
        assert!(result.is_err());

        // Invalid magic
        let mut bad_magic = vec![0u8; 100];
        let result = manager.decrypt(&bad_magic, 1);
        assert!(result.is_err());

        // Valid header but tampered ciphertext
        bad_magic[0..4].copy_from_slice(&ENCRYPTION_MAGIC);
        bad_magic[4] = FORMAT_VERSION;
        let result = manager.decrypt(&bad_magic, 1);
        assert!(result.is_err());
    }

    #[test]
    fn test_tamper_detection() {
        let manager = EncryptionManager::new(test_config()).unwrap();
        let plaintext = b"Sensitive data that must not be tampered with";

        let mut ciphertext = manager.encrypt(plaintext, 1).unwrap();

        // Tamper with ciphertext (flip a bit in the data portion)
        let tamper_pos = EncryptedHeader::SIZE + 10;
        ciphertext[tamper_pos] ^= 0x01;

        // Decryption should fail due to authentication
        let result = manager.decrypt(&ciphertext, 1);
        assert!(result.is_err());
    }

    #[test]
    fn test_different_lsns_produce_different_ciphertexts() {
        let manager = EncryptionManager::new(test_config()).unwrap();
        let plaintext = b"Same plaintext";

        let ct1 = manager.encrypt(plaintext, 1).unwrap();
        let ct2 = manager.encrypt(plaintext, 2).unwrap();

        // Same plaintext should produce different ciphertexts for different LSNs
        assert_ne!(ct1, ct2);

        // Both should decrypt correctly
        assert_eq!(manager.decrypt(&ct1, 1).unwrap(), plaintext);
        assert_eq!(manager.decrypt(&ct2, 2).unwrap(), plaintext);
    }

    #[test]
    fn test_disabled_encryption_passthrough() {
        let disabled = DisabledEncryption;
        let plaintext = b"Not encrypted";

        let encrypted = disabled.encrypt(plaintext, 1).unwrap();
        assert_eq!(&encrypted[..], plaintext);

        let decrypted = disabled.decrypt(plaintext, 1).unwrap();
        assert_eq!(&decrypted[..], plaintext);

        assert_eq!(disabled.encrypted_size(100), 100);
        assert!(!disabled.is_enabled());
    }

    #[test]
    fn test_master_key_validation() {
        // Wrong key size
        let result = MasterKey::new(vec![0u8; 16], 1);
        assert!(result.is_err());

        // Correct size
        let result = MasterKey::new(vec![0u8; 32], 1);
        assert!(result.is_ok());
    }

    #[test]
    fn test_key_derivation_consistency() {
        let key = MasterKey::new(vec![42u8; 32], 1).unwrap();

        let dk1 = key.derive_data_key(b"scope1").unwrap();
        let dk2 = key.derive_data_key(b"scope1").unwrap();
        let dk3 = key.derive_data_key(b"scope2").unwrap();

        // Same scope should derive same key
        assert_eq!(dk1, dk2);

        // Different scopes should derive different keys
        assert_ne!(dk1, dk3);
    }

    #[test]
    fn test_large_data_encryption() {
        let manager = EncryptionManager::new(test_config()).unwrap();

        // Test with 1MB of data
        let plaintext: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();

        let ciphertext = manager.encrypt(&plaintext, 999999).unwrap();
        let decrypted = manager.decrypt(&ciphertext, 999999).unwrap();

        assert_eq!(decrypted, plaintext);
    }

    #[test]
    fn test_generate_key() {
        let key = MasterKey::generate(1).unwrap();
        assert_eq!(key.version(), 1);
    }

    #[test]
    fn test_chacha20_poly1305_encrypt_decrypt() {
        let config = EncryptionConfig {
            enabled: true,
            algorithm: Algorithm::ChaCha20Poly1305,
            key_provider: KeyProvider::InMemory(vec![0u8; 32]),
            key_rotation_days: 0,
            aad_scope: "test".to_string(),
        };
        let manager = EncryptionManager::new(config).unwrap();
        let plaintext = b"ChaCha20-Poly1305 test payload";
        let lsn = 42u64;

        let ciphertext = manager.encrypt(plaintext, lsn).unwrap();
        assert_ne!(ciphertext.as_slice(), plaintext.as_slice());

        // Verify header encodes ChaCha20
        let header = EncryptedHeader::from_bytes(&ciphertext).unwrap();
        assert_eq!(header.algorithm, Algorithm::ChaCha20Poly1305);

        let decrypted = manager.decrypt(&ciphertext, lsn).unwrap();
        assert_eq!(decrypted, plaintext);
    }

    #[test]
    fn test_key_rotation() {
        let config = EncryptionConfig {
            enabled: true,
            algorithm: Algorithm::Aes256Gcm,
            key_provider: KeyProvider::InMemory(vec![1u8; 32]),
            key_rotation_days: 30,
            aad_scope: "test".to_string(),
        };
        let manager = EncryptionManager::new(config).unwrap();

        // Encrypt with version 1
        let plaintext = b"data encrypted with key v1";
        let ct_v1 = manager.encrypt(plaintext, 100).unwrap();
        assert_eq!(manager.key_version(), 1);

        // Rotate to version 2
        let new_master = MasterKey::new(vec![2u8; 32], 2).unwrap();
        manager.rotate_key(new_master).unwrap();
        assert_eq!(manager.key_version(), 2);

        // Old ciphertext (v1) still decryptable
        let decrypted = manager.decrypt(&ct_v1, 100).unwrap();
        assert_eq!(decrypted, plaintext);

        // Rotation with same or lower version should fail
        let bad_master = MasterKey::new(vec![3u8; 32], 1).unwrap();
        assert!(manager.rotate_key(bad_master).is_err());
    }
}