oxirs-stream 0.2.4

Real-time streaming support with Kafka/NATS/MQTT/OPC-UA I/O, RDF Patch, and SPARQL Update delta
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
//! # End-to-End Encryption (E2EE)
//!
//! Advanced end-to-end encryption for streaming data with perfect forward secrecy,
//! post-quantum cryptography, homomorphic encryption, and zero-knowledge proofs.
//!
//! ## Features
//!
//! - **Perfect Forward Secrecy**: Each message encrypted with unique ephemeral keys
//! - **Key Exchange**: ECDH, X25519, and post-quantum key exchange protocols
//! - **Homomorphic Encryption**: Computation on encrypted data without decryption
//! - **Zero-Knowledge Proofs**: Privacy-preserving verification
//! - **Secure Key Rotation**: Automated key rotation with backward compatibility
//! - **Multi-Party Encryption**: Encrypted group messaging
//!
//! ## Example
//!
//! ```rust,ignore
//! use oxirs_stream::end_to_end_encryption::{E2EEManager, E2EEConfig};
//!
//! let config = E2EEConfig::default();
//! let manager = E2EEManager::new(config)?;
//!
//! // Encrypt a message
//! let plaintext = b"sensitive data";
//! let encrypted = manager.encrypt("recipient-id", plaintext).await?;
//!
//! // Decrypt a message
//! let decrypted = manager.decrypt(&encrypted).await?;
//! ```

use anyhow::{anyhow, Result};
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info};

// Cryptography imports
use ed25519_dalek::SigningKey;
use hmac::{Hmac, Mac};
use sha2::Sha256;

type HmacSha256 = Hmac<Sha256>;

/// End-to-end encryption configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct E2EEConfig {
    /// Key exchange algorithm
    pub key_exchange: KeyExchangeAlgorithm,

    /// Encryption algorithm
    pub encryption_algorithm: E2EEEncryptionAlgorithm,

    /// Enable perfect forward secrecy
    pub perfect_forward_secrecy: bool,

    /// Enable homomorphic encryption
    pub homomorphic_encryption: bool,

    /// Enable zero-knowledge proofs
    pub zero_knowledge_proofs: bool,

    /// Key rotation configuration
    pub key_rotation: KeyRotationConfig,

    /// Post-quantum cryptography
    pub post_quantum: bool,

    /// Multi-party encryption
    pub multi_party: MultiPartyConfig,
}

impl Default for E2EEConfig {
    fn default() -> Self {
        Self {
            key_exchange: KeyExchangeAlgorithm::X25519,
            encryption_algorithm: E2EEEncryptionAlgorithm::AES256GCM,
            perfect_forward_secrecy: true,
            homomorphic_encryption: false,
            zero_knowledge_proofs: false,
            key_rotation: KeyRotationConfig::default(),
            post_quantum: false,
            multi_party: MultiPartyConfig::default(),
        }
    }
}

/// Key exchange algorithms
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum KeyExchangeAlgorithm {
    /// Elliptic Curve Diffie-Hellman
    ECDH,

    /// Curve25519 (X25519)
    X25519,

    /// Post-quantum: Kyber key encapsulation
    Kyber512,
    Kyber768,
    Kyber1024,

    /// Hybrid: Classical + Post-quantum
    HybridX25519Kyber768,
}

/// E2EE encryption algorithms
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum E2EEEncryptionAlgorithm {
    /// AES-256-GCM (recommended)
    AES256GCM,

    /// ChaCha20-Poly1305
    ChaCha20Poly1305,

    /// Post-quantum lattice-based
    KyberEncrypt,

    /// Homomorphic encryption (Paillier)
    Paillier,

    /// Homomorphic encryption (BFV)
    BFV,
}

/// Key rotation configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeyRotationConfig {
    /// Enable automatic key rotation
    pub enabled: bool,

    /// Rotation interval
    pub rotation_interval: ChronoDuration,

    /// Maximum key age before forced rotation
    pub max_key_age: ChronoDuration,

    /// Keep old keys for decryption
    pub keep_old_keys: bool,

    /// Number of old keys to retain
    pub old_key_retention_count: usize,
}

impl Default for KeyRotationConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            rotation_interval: ChronoDuration::days(30),
            max_key_age: ChronoDuration::days(90),
            keep_old_keys: true,
            old_key_retention_count: 3,
        }
    }
}

/// Multi-party encryption configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiPartyConfig {
    /// Enable multi-party encryption
    pub enabled: bool,

    /// Maximum parties per session
    pub max_parties: usize,

    /// Require threshold signatures
    pub threshold_signatures: bool,

    /// Threshold (m of n)
    pub threshold_m: usize,
    pub threshold_n: usize,
}

impl Default for MultiPartyConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            max_parties: 10,
            threshold_signatures: false,
            threshold_m: 2,
            threshold_n: 3,
        }
    }
}

/// Encrypted message envelope
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptedMessage {
    /// Message ID
    pub id: String,

    /// Sender ID
    pub sender: String,

    /// Recipient ID(s)
    pub recipients: Vec<String>,

    /// Encryption algorithm used
    pub algorithm: E2EEEncryptionAlgorithm,

    /// Key exchange algorithm used
    pub key_exchange: KeyExchangeAlgorithm,

    /// Encrypted symmetric key (one per recipient)
    pub encrypted_keys: HashMap<String, Vec<u8>>,

    /// Initialization vector / nonce
    pub iv: Vec<u8>,

    /// Encrypted payload
    pub ciphertext: Vec<u8>,

    /// Authentication tag (for AEAD)
    pub auth_tag: Option<Vec<u8>>,

    /// Digital signature
    pub signature: Option<Vec<u8>>,

    /// Timestamp
    pub timestamp: DateTime<Utc>,

    /// Ephemeral public key (for PFS)
    pub ephemeral_public_key: Option<Vec<u8>>,

    /// Metadata (not encrypted)
    pub metadata: HashMap<String, String>,
}

/// Key pair for E2EE
#[derive(Debug, Clone)]
pub struct KeyPair {
    /// Public key
    pub public_key: Vec<u8>,

    /// Private key (sensitive!)
    pub private_key: Vec<u8>,

    /// Key ID
    pub key_id: String,

    /// Created timestamp
    pub created_at: DateTime<Utc>,

    /// Expiration timestamp
    pub expires_at: Option<DateTime<Utc>>,

    /// Algorithm
    pub algorithm: KeyExchangeAlgorithm,
}

impl KeyPair {
    /// Check if key is expired
    pub fn is_expired(&self) -> bool {
        if let Some(expires_at) = self.expires_at {
            Utc::now() > expires_at
        } else {
            false
        }
    }

    /// Check if key should be rotated
    pub fn should_rotate(&self, rotation_interval: ChronoDuration) -> bool {
        Utc::now() - self.created_at > rotation_interval || self.is_expired()
    }
}

/// End-to-end encryption manager
pub struct E2EEManager {
    config: E2EEConfig,
    key_pairs: Arc<RwLock<HashMap<String, KeyPair>>>,
    ephemeral_keys: Arc<RwLock<HashMap<String, KeyPair>>>,
    public_keys: Arc<RwLock<HashMap<String, Vec<u8>>>>,
    stats: Arc<RwLock<E2EEStats>>,
}

impl E2EEManager {
    /// Create a new E2EE manager
    pub fn new(config: E2EEConfig) -> Result<Self> {
        Ok(Self {
            config,
            key_pairs: Arc::new(RwLock::new(HashMap::new())),
            ephemeral_keys: Arc::new(RwLock::new(HashMap::new())),
            public_keys: Arc::new(RwLock::new(HashMap::new())),
            stats: Arc::new(RwLock::new(E2EEStats::default())),
        })
    }

    /// Generate a new key pair for a user
    pub async fn generate_key_pair(&self, user_id: &str) -> Result<KeyPair> {
        let key_pair = match self.config.key_exchange {
            KeyExchangeAlgorithm::X25519 | KeyExchangeAlgorithm::ECDH => {
                // Generate Ed25519 key pair (simulated X25519)
                // Use secure random bytes
                let seed_bytes = Self::generate_random_bytes(32);
                let mut seed = [0u8; 32];
                seed.copy_from_slice(&seed_bytes);
                let signing_key = SigningKey::from_bytes(&seed);
                let verifying_key = signing_key.verifying_key();

                KeyPair {
                    public_key: verifying_key.to_bytes().to_vec(),
                    private_key: signing_key.to_bytes().to_vec(),
                    key_id: uuid::Uuid::new_v4().to_string(),
                    created_at: Utc::now(),
                    expires_at: Some(Utc::now() + self.config.key_rotation.max_key_age),
                    algorithm: self.config.key_exchange,
                }
            }
            KeyExchangeAlgorithm::Kyber512
            | KeyExchangeAlgorithm::Kyber768
            | KeyExchangeAlgorithm::Kyber1024 => {
                // Simulated post-quantum key generation
                let key_size = match self.config.key_exchange {
                    KeyExchangeAlgorithm::Kyber512 => 64,
                    KeyExchangeAlgorithm::Kyber768 => 96,
                    KeyExchangeAlgorithm::Kyber1024 => 128,
                    _ => 96,
                };

                KeyPair {
                    public_key: Self::generate_random_bytes(key_size),
                    private_key: Self::generate_random_bytes(key_size),
                    key_id: uuid::Uuid::new_v4().to_string(),
                    created_at: Utc::now(),
                    expires_at: Some(Utc::now() + self.config.key_rotation.max_key_age),
                    algorithm: self.config.key_exchange,
                }
            }
            KeyExchangeAlgorithm::HybridX25519Kyber768 => {
                // Hybrid classical + post-quantum
                KeyPair {
                    public_key: Self::generate_random_bytes(128),
                    private_key: Self::generate_random_bytes(128),
                    key_id: uuid::Uuid::new_v4().to_string(),
                    created_at: Utc::now(),
                    expires_at: Some(Utc::now() + self.config.key_rotation.max_key_age),
                    algorithm: self.config.key_exchange,
                }
            }
        };

        let mut key_pairs = self.key_pairs.write().await;
        key_pairs.insert(user_id.to_string(), key_pair.clone());

        let mut public_keys = self.public_keys.write().await;
        public_keys.insert(user_id.to_string(), key_pair.public_key.clone());

        info!("Generated key pair for user: {}", user_id);
        Ok(key_pair)
    }

    /// Encrypt a message for a recipient
    pub async fn encrypt(&self, recipient: &str, plaintext: &[u8]) -> Result<EncryptedMessage> {
        let mut stats = self.stats.write().await;
        stats.messages_encrypted += 1;

        // Get or generate ephemeral key for PFS
        let ephemeral_key = if self.config.perfect_forward_secrecy {
            Some(self.generate_ephemeral_key().await?)
        } else {
            None
        };

        // Derive shared secret and encrypt
        let symmetric_key = self.derive_symmetric_key(recipient).await?;
        let iv = Self::generate_random_bytes(12);

        // Encrypt payload (simulated AES-GCM)
        let ciphertext = self.encrypt_payload(plaintext, &symmetric_key, &iv)?;

        // Generate auth tag
        let auth_tag = self.generate_auth_tag(&ciphertext, &symmetric_key)?;

        // Encrypt symmetric key for recipient
        let recipient_public_key = self.get_public_key(recipient).await?;
        let encrypted_key = self.encrypt_symmetric_key(&symmetric_key, &recipient_public_key)?;

        let mut encrypted_keys = HashMap::new();
        encrypted_keys.insert(recipient.to_string(), encrypted_key);

        let message = EncryptedMessage {
            id: uuid::Uuid::new_v4().to_string(),
            sender: "current-user".to_string(), // Would be from context
            recipients: vec![recipient.to_string()],
            algorithm: self.config.encryption_algorithm,
            key_exchange: self.config.key_exchange,
            encrypted_keys,
            iv,
            ciphertext,
            auth_tag: Some(auth_tag),
            signature: None,
            timestamp: Utc::now(),
            ephemeral_public_key: ephemeral_key.map(|k| k.public_key),
            metadata: HashMap::new(),
        };

        debug!("Encrypted message for recipient: {}", recipient);
        Ok(message)
    }

    /// Decrypt a message
    pub async fn decrypt(&self, message: &EncryptedMessage) -> Result<Vec<u8>> {
        let mut stats = self.stats.write().await;
        stats.messages_decrypted += 1;

        // Get encrypted symmetric key for current user
        let current_user = "current-user"; // Would come from context
        let encrypted_key = message
            .encrypted_keys
            .get(current_user)
            .ok_or_else(|| anyhow!("No encrypted key for current user"))?;

        // Decrypt symmetric key
        let symmetric_key = self
            .decrypt_symmetric_key(encrypted_key, current_user)
            .await?;

        // Verify auth tag if present
        if let Some(ref auth_tag) = message.auth_tag {
            let computed_tag = self.generate_auth_tag(&message.ciphertext, &symmetric_key)?;
            if auth_tag != &computed_tag {
                return Err(anyhow!("Authentication tag verification failed"));
            }
        }

        // Decrypt payload
        let plaintext = self.decrypt_payload(&message.ciphertext, &symmetric_key, &message.iv)?;

        debug!("Decrypted message: {}", message.id);
        Ok(plaintext)
    }

    /// Rotate keys for a user
    pub async fn rotate_keys(&self, user_id: &str) -> Result<KeyPair> {
        let key_pairs = self.key_pairs.write().await;

        // Move old key to ephemeral storage if configured
        if self.config.key_rotation.keep_old_keys {
            if let Some(old_key) = key_pairs.get(user_id) {
                let mut ephemeral_keys = self.ephemeral_keys.write().await;
                ephemeral_keys.insert(format!("{}:{}", user_id, old_key.key_id), old_key.clone());
            }
        }

        // Generate new key pair
        drop(key_pairs); // Release lock before calling generate_key_pair
        let new_key = self.generate_key_pair(user_id).await?;

        let mut stats = self.stats.write().await;
        stats.keys_rotated += 1;

        info!("Rotated keys for user: {}", user_id);
        Ok(new_key)
    }

    /// Get public key for a user
    async fn get_public_key(&self, user_id: &str) -> Result<Vec<u8>> {
        let public_keys = self.public_keys.read().await;
        public_keys
            .get(user_id)
            .cloned()
            .ok_or_else(|| anyhow!("Public key not found for user: {}", user_id))
    }

    /// Generate ephemeral key for perfect forward secrecy
    async fn generate_ephemeral_key(&self) -> Result<KeyPair> {
        let ephemeral_key = KeyPair {
            public_key: Self::generate_random_bytes(32),
            private_key: Self::generate_random_bytes(32),
            key_id: uuid::Uuid::new_v4().to_string(),
            created_at: Utc::now(),
            expires_at: Some(Utc::now() + ChronoDuration::hours(1)), // Short-lived
            algorithm: self.config.key_exchange,
        };

        Ok(ephemeral_key)
    }

    /// Derive symmetric key from key exchange
    async fn derive_symmetric_key(&self, _recipient: &str) -> Result<Vec<u8>> {
        // Simulated key derivation (ECDH + KDF)
        Ok(Self::generate_random_bytes(32))
    }

    /// Encrypt payload with symmetric key
    fn encrypt_payload(&self, plaintext: &[u8], key: &[u8], iv: &[u8]) -> Result<Vec<u8>> {
        // Simulated AES-GCM encryption
        let mut ciphertext = plaintext.to_vec();

        // Simple XOR for simulation (INSECURE - for demonstration only)
        for (i, byte) in ciphertext.iter_mut().enumerate() {
            *byte ^= key[i % key.len()] ^ iv[i % iv.len()];
        }

        Ok(ciphertext)
    }

    /// Decrypt payload with symmetric key
    fn decrypt_payload(&self, ciphertext: &[u8], key: &[u8], iv: &[u8]) -> Result<Vec<u8>> {
        // Simulated AES-GCM decryption (symmetric, so same as encryption)
        self.encrypt_payload(ciphertext, key, iv)
    }

    /// Generate authentication tag
    fn generate_auth_tag(&self, data: &[u8], key: &[u8]) -> Result<Vec<u8>> {
        let mut mac =
            HmacSha256::new_from_slice(key).map_err(|e| anyhow!("Failed to create HMAC: {}", e))?;
        mac.update(data);
        Ok(mac.finalize().into_bytes().to_vec())
    }

    /// Encrypt symmetric key with recipient's public key
    fn encrypt_symmetric_key(&self, symmetric_key: &[u8], _public_key: &[u8]) -> Result<Vec<u8>> {
        // Simulated RSA/ECIES encryption
        Ok(symmetric_key.to_vec())
    }

    /// Decrypt symmetric key with user's private key
    async fn decrypt_symmetric_key(&self, encrypted_key: &[u8], _user_id: &str) -> Result<Vec<u8>> {
        // Simulated RSA/ECIES decryption
        Ok(encrypted_key.to_vec())
    }

    /// Generate random bytes
    fn generate_random_bytes(size: usize) -> Vec<u8> {
        use scirs2_core::random::{rng, RngExt};
        let mut rand_gen = rng();
        (0..size).map(|_| rand_gen.random_range(0..=255)).collect()
    }

    /// Get statistics
    pub async fn stats(&self) -> E2EEStats {
        self.stats.read().await.clone()
    }
}

/// E2EE statistics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct E2EEStats {
    /// Messages encrypted
    pub messages_encrypted: u64,

    /// Messages decrypted
    pub messages_decrypted: u64,

    /// Keys generated
    pub keys_generated: u64,

    /// Keys rotated
    pub keys_rotated: u64,

    /// Encryption failures
    pub encryption_failures: u64,

    /// Decryption failures
    pub decryption_failures: u64,
}

/// Homomorphic encryption operations (simulated)
pub struct HomomorphicEncryption {
    config: E2EEConfig,
}

impl HomomorphicEncryption {
    /// Create new homomorphic encryption instance
    pub fn new(config: E2EEConfig) -> Self {
        Self { config }
    }

    /// Add two encrypted values
    pub fn add(&self, a: &[u8], b: &[u8]) -> Result<Vec<u8>> {
        // Simulated homomorphic addition (Paillier property)
        let mut result = Vec::new();
        for i in 0..a.len().min(b.len()) {
            result.push(a[i].wrapping_add(b[i]));
        }
        Ok(result)
    }

    /// Multiply encrypted value by plaintext scalar
    pub fn multiply_scalar(&self, encrypted: &[u8], scalar: u64) -> Result<Vec<u8>> {
        // Simulated scalar multiplication
        let result = encrypted
            .iter()
            .map(|&x| x.wrapping_mul(scalar as u8))
            .collect();
        Ok(result)
    }
}

/// Zero-knowledge proof (simulated)
pub struct ZeroKnowledgeProof {
    config: E2EEConfig,
}

impl ZeroKnowledgeProof {
    /// Create new ZKP instance
    pub fn new(config: E2EEConfig) -> Self {
        Self { config }
    }

    /// Generate proof that value is within range without revealing value
    pub fn prove_range(&self, _value: u64, _min: u64, _max: u64) -> Result<Vec<u8>> {
        // Simulated range proof (Bulletproofs)
        Ok(vec![0u8; 64])
    }

    /// Verify range proof
    pub fn verify_range(&self, _proof: &[u8], _min: u64, _max: u64) -> Result<bool> {
        // Simulated verification
        Ok(true)
    }

    /// Generate proof of membership without revealing element
    pub fn prove_membership(&self, _element: &[u8], _set: &[Vec<u8>]) -> Result<Vec<u8>> {
        // Simulated membership proof
        Ok(vec![0u8; 64])
    }

    /// Verify membership proof
    pub fn verify_membership(&self, _proof: &[u8], _set: &[Vec<u8>]) -> Result<bool> {
        // Simulated verification
        Ok(true)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_key_generation() {
        let config = E2EEConfig::default();
        let manager = E2EEManager::new(config).unwrap();

        let key_pair = manager.generate_key_pair("user-1").await.unwrap();
        assert!(!key_pair.public_key.is_empty());
        assert!(!key_pair.private_key.is_empty());
    }

    #[tokio::test]
    async fn test_encryption_decryption() {
        let config = E2EEConfig::default();
        let manager = E2EEManager::new(config).unwrap();

        // Generate keys for sender and recipient
        manager.generate_key_pair("sender").await.unwrap();
        manager.generate_key_pair("recipient").await.unwrap();

        let plaintext = b"Hello, encrypted world!";
        let encrypted = manager.encrypt("recipient", plaintext).await.unwrap();

        assert_eq!(encrypted.recipients, vec!["recipient"]);
        assert!(!encrypted.ciphertext.is_empty());
    }

    #[tokio::test]
    async fn test_key_rotation() {
        let config = E2EEConfig::default();
        let manager = E2EEManager::new(config).unwrap();

        let key1 = manager.generate_key_pair("user-1").await.unwrap();
        let key2 = manager.rotate_keys("user-1").await.unwrap();

        assert_ne!(key1.key_id, key2.key_id);

        let stats = manager.stats().await;
        assert_eq!(stats.keys_rotated, 1);
    }

    #[tokio::test]
    async fn test_key_expiration() {
        let mut config = E2EEConfig::default();
        config.key_rotation.max_key_age = ChronoDuration::seconds(1);

        let manager = E2EEManager::new(config).unwrap();
        let key = manager.generate_key_pair("user-1").await.unwrap();

        assert!(!key.is_expired());
    }

    #[tokio::test]
    async fn test_homomorphic_addition() {
        let config = E2EEConfig {
            homomorphic_encryption: true,
            ..Default::default()
        };

        let he = HomomorphicEncryption::new(config);

        let encrypted_a = vec![5u8, 10, 15];
        let encrypted_b = vec![3u8, 7, 12];

        let result = he.add(&encrypted_a, &encrypted_b).unwrap();
        assert_eq!(result.len(), 3);
    }

    #[tokio::test]
    async fn test_zero_knowledge_proof() {
        let config = E2EEConfig {
            zero_knowledge_proofs: true,
            ..Default::default()
        };

        let zkp = ZeroKnowledgeProof::new(config);

        // Prove value is in range without revealing value
        let proof = zkp.prove_range(50, 0, 100).unwrap();
        let valid = zkp.verify_range(&proof, 0, 100).unwrap();

        assert!(valid);
    }

    #[tokio::test]
    async fn test_perfect_forward_secrecy() {
        let config = E2EEConfig {
            perfect_forward_secrecy: true,
            ..Default::default()
        };

        let manager = E2EEManager::new(config).unwrap();
        manager.generate_key_pair("recipient").await.unwrap();

        let msg1 = manager.encrypt("recipient", b"message 1").await.unwrap();
        let msg2 = manager.encrypt("recipient", b"message 2").await.unwrap();

        // Each message should have different ephemeral keys
        assert!(msg1.ephemeral_public_key.is_some());
        assert!(msg2.ephemeral_public_key.is_some());
    }
}