Skip to main content

rusmes_storage/backends/
amaters.rs

1//! AmateRS distributed storage backend
2//!
3//! This backend provides distributed message storage using AmateRS
4//! (a hypothetical distributed key-value store similar to Cassandra/ScyllaDB).
5//!
6//! # Features
7//!
8//! - **Distributed blob storage**: Message bodies stored separately from metadata
9//! - **Replication**: Configurable replication factor (default: 3)
10//! - **Consistency levels**: Support for ONE/QUORUM/ALL/LocalQuorum
11//! - **Circuit breaker**: Automatic failover on node failures
12//! - **Retry logic**: Exponential backoff for temporary failures
13//! - **Eventual consistency**: Read-your-writes consistency where possible
14//!
15//! # Configuration
16//!
17//! ```rust,ignore
18//! use rusmes_storage::backends::amaters::{AmatersConfig, ConsistencyLevel};
19//!
20//! let config = AmatersConfig {
21//!     cluster_endpoints: vec!["node1:9042".to_string(), "node2:9042".to_string()],
22//!     replication_factor: 3,
23//!     read_consistency: ConsistencyLevel::Quorum,
24//!     write_consistency: ConsistencyLevel::Quorum,
25//!     timeout_ms: 10000,
26//!     max_retries: 3,
27//!     circuit_breaker_threshold: 5,
28//!     circuit_breaker_timeout_ms: 60000,
29//!     ..Default::default()
30//! };
31//! ```
32//!
33//! # Implementation Note
34//!
35//! This is currently a mock implementation. Replace with real AmateRS client library
36//! when it becomes available. The interface is designed to be compatible with
37//! distributed systems like Apache Cassandra or ScyllaDB.
38
39use crate::traits::{MailboxStore, MessageStore, MetadataStore, StorageBackend};
40use crate::types::{
41    Mailbox, MailboxCounters, MailboxId, MailboxPath, MessageFlags, MessageMetadata, Quota,
42    SearchCriteria,
43};
44use async_trait::async_trait;
45use rusmes_proto::{Mail, MessageId, Username};
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use std::sync::Arc;
49use tokio::sync::RwLock;
50
51/// AmateRS cluster configuration
52#[derive(Debug, Clone)]
53pub struct AmatersConfig {
54    /// Cluster contact points (host:port)
55    pub cluster_endpoints: Vec<String>,
56    /// Keyspace for metadata
57    pub metadata_keyspace: String,
58    /// Keyspace for message blobs
59    pub blob_keyspace: String,
60    /// Replication factor (default: 3)
61    pub replication_factor: usize,
62    /// Consistency level for reads
63    pub read_consistency: ConsistencyLevel,
64    /// Consistency level for writes
65    pub write_consistency: ConsistencyLevel,
66    /// Connection timeout in milliseconds
67    pub timeout_ms: u64,
68    /// Maximum retry attempts
69    pub max_retries: usize,
70    /// Enable compression
71    pub enable_compression: bool,
72    /// Circuit breaker failure threshold
73    pub circuit_breaker_threshold: usize,
74    /// Circuit breaker timeout in milliseconds
75    pub circuit_breaker_timeout_ms: u64,
76}
77
78impl Default for AmatersConfig {
79    fn default() -> Self {
80        Self {
81            cluster_endpoints: vec!["localhost:9042".to_string()],
82            metadata_keyspace: "rusmes_metadata".to_string(),
83            blob_keyspace: "rusmes_blobs".to_string(),
84            replication_factor: 3,
85            read_consistency: ConsistencyLevel::Quorum,
86            write_consistency: ConsistencyLevel::Quorum,
87            timeout_ms: 10000,
88            max_retries: 3,
89            enable_compression: true,
90            circuit_breaker_threshold: 5,
91            circuit_breaker_timeout_ms: 60000,
92        }
93    }
94}
95
96/// Consistency level for operations
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ConsistencyLevel {
99    /// Require all replicas
100    All,
101    /// Require quorum of replicas
102    Quorum,
103    /// Require only one replica
104    One,
105    /// Local quorum (same datacenter)
106    LocalQuorum,
107}
108
109/// Serializable mailbox metadata for storage
110#[derive(Debug, Clone, Serialize, Deserialize)]
111struct MailboxRecord {
112    id: String,
113    username: String,
114    path: Vec<String>,
115    uid_validity: u32,
116    uid_next: u32,
117    special_use: Option<String>,
118    created_at: i64,
119}
120
121/// Serializable message metadata for storage
122#[derive(Debug, Clone, Serialize, Deserialize)]
123struct MessageRecord {
124    id: String,
125    mailbox_id: String,
126    uid: u32,
127    sender: Option<String>,
128    recipients: Vec<String>,
129    headers: HashMap<String, String>,
130    size: usize,
131    blob_key: String,
132    created_at: i64,
133}
134
135/// Message blob stored separately
136#[derive(Debug, Clone, Serialize, Deserialize)]
137struct MessageBlob {
138    message_id: String,
139    body: Vec<u8>,
140    compressed: bool,
141}
142
143/// Circuit breaker state for failover handling
144#[derive(Debug, Clone)]
145enum CircuitBreakerState {
146    Closed,
147    Open { opened_at: std::time::Instant },
148    HalfOpen,
149}
150
151/// Circuit breaker for handling node failures
152struct CircuitBreaker {
153    state: Arc<RwLock<CircuitBreakerState>>,
154    failure_count: Arc<RwLock<usize>>,
155    threshold: usize,
156    timeout_ms: u64,
157}
158
159impl CircuitBreaker {
160    fn new(threshold: usize, timeout_ms: u64) -> Self {
161        Self {
162            state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
163            failure_count: Arc::new(RwLock::new(0)),
164            threshold,
165            timeout_ms,
166        }
167    }
168
169    async fn is_open(&self) -> bool {
170        let state = self.state.read().await;
171        matches!(*state, CircuitBreakerState::Open { .. })
172    }
173
174    async fn record_success(&self) {
175        let mut count = self.failure_count.write().await;
176        *count = 0;
177        let mut state = self.state.write().await;
178        *state = CircuitBreakerState::Closed;
179    }
180
181    async fn record_failure(&self) {
182        let mut count = self.failure_count.write().await;
183        *count += 1;
184
185        if *count >= self.threshold {
186            let mut state = self.state.write().await;
187            *state = CircuitBreakerState::Open {
188                opened_at: std::time::Instant::now(),
189            };
190        }
191    }
192
193    async fn attempt_reset(&self) {
194        let state = self.state.read().await;
195        if let CircuitBreakerState::Open { opened_at } = *state {
196            if opened_at.elapsed().as_millis() as u64 >= self.timeout_ms {
197                drop(state);
198                let mut state = self.state.write().await;
199                *state = CircuitBreakerState::HalfOpen;
200            }
201        }
202    }
203}
204
205/// Mock AmateRS client implementing the AmateRS distributed key-value store interface.
206/// Replace with real AmateRS client library when it becomes available.
207struct AmatersClient {
208    config: AmatersConfig,
209    metadata: Arc<RwLock<HashMap<String, Vec<u8>>>>,
210    blobs: Arc<RwLock<HashMap<String, Vec<u8>>>>,
211    circuit_breaker: CircuitBreaker,
212}
213
214impl AmatersClient {
215    fn new(config: AmatersConfig) -> Self {
216        let circuit_breaker = CircuitBreaker::new(
217            config.circuit_breaker_threshold,
218            config.circuit_breaker_timeout_ms,
219        );
220
221        Self {
222            config,
223            metadata: Arc::new(RwLock::new(HashMap::new())),
224            blobs: Arc::new(RwLock::new(HashMap::new())),
225            circuit_breaker,
226        }
227    }
228
229    async fn connect(&self) -> anyhow::Result<()> {
230        // In production: establish connections to cluster with retry and failover
231        tracing::info!(
232            "Connecting to AmateRS cluster at {:?}",
233            self.config.cluster_endpoints
234        );
235
236        // Check circuit breaker
237        if self.circuit_breaker.is_open().await {
238            self.circuit_breaker.attempt_reset().await;
239            if self.circuit_breaker.is_open().await {
240                return Err(anyhow::anyhow!("Circuit breaker is open"));
241            }
242        }
243
244        Ok(())
245    }
246
247    async fn init_keyspaces(&self) -> anyhow::Result<()> {
248        // In production: CREATE KEYSPACE IF NOT EXISTS with replication settings
249        tracing::info!(
250            "Initializing keyspaces: {} and {}",
251            self.config.metadata_keyspace,
252            self.config.blob_keyspace
253        );
254        Ok(())
255    }
256
257    async fn put(&self, keyspace: &str, key: String, value: Vec<u8>) -> anyhow::Result<()> {
258        // Check circuit breaker before attempting
259        if self.circuit_breaker.is_open().await {
260            self.circuit_breaker.attempt_reset().await;
261            if self.circuit_breaker.is_open().await {
262                return Err(anyhow::anyhow!(
263                    "Circuit breaker is open, rejecting request"
264                ));
265            }
266        }
267
268        let store = if keyspace.contains("blob") {
269            &self.blobs
270        } else {
271            &self.metadata
272        };
273
274        // Retry logic with exponential backoff
275        let mut last_error = None;
276        for attempt in 0..self.config.max_retries {
277            match self.put_with_retry(store, key.clone(), value.clone()).await {
278                Ok(_) => {
279                    self.circuit_breaker.record_success().await;
280                    return Ok(());
281                }
282                Err(e) => {
283                    tracing::warn!("Put failed (attempt {}): {}", attempt + 1, e);
284                    last_error = Some(e);
285
286                    if attempt < self.config.max_retries - 1 {
287                        // Exponential backoff: 100ms, 200ms, 400ms, etc.
288                        let backoff = 100 * 2_u64.pow(attempt as u32);
289                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
290                    }
291                }
292            }
293        }
294
295        // All retries failed
296        self.circuit_breaker.record_failure().await;
297        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Put operation failed")))
298    }
299
300    async fn put_with_retry(
301        &self,
302        store: &Arc<RwLock<HashMap<String, Vec<u8>>>>,
303        key: String,
304        value: Vec<u8>,
305    ) -> anyhow::Result<()> {
306        let mut map = store.write().await;
307        map.insert(key, value);
308        Ok(())
309    }
310
311    async fn get(&self, keyspace: &str, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
312        let store = if keyspace.contains("blob") {
313            &self.blobs
314        } else {
315            &self.metadata
316        };
317
318        let map = store.read().await;
319        Ok(map.get(key).cloned())
320    }
321
322    async fn delete(&self, keyspace: &str, key: &str) -> anyhow::Result<()> {
323        let store = if keyspace.contains("blob") {
324            &self.blobs
325        } else {
326            &self.metadata
327        };
328
329        let mut map = store.write().await;
330        map.remove(key);
331        Ok(())
332    }
333
334    async fn list_prefix(&self, keyspace: &str, prefix: &str) -> anyhow::Result<Vec<String>> {
335        let store = if keyspace.contains("blob") {
336            &self.blobs
337        } else {
338            &self.metadata
339        };
340
341        let map = store.read().await;
342        Ok(map
343            .keys()
344            .filter(|k| k.starts_with(prefix))
345            .cloned()
346            .collect())
347    }
348}
349
350/// AmateRS distributed storage backend
351pub struct AmatersBackend {
352    client: Arc<AmatersClient>,
353    config: AmatersConfig,
354}
355
356impl AmatersBackend {
357    /// Create a new AmateRS backend
358    pub async fn new(config: AmatersConfig) -> anyhow::Result<Self> {
359        let client = Arc::new(AmatersClient::new(config.clone()));
360        client.connect().await?;
361        client.init_keyspaces().await?;
362
363        Ok(Self { client, config })
364    }
365
366    /// Initialize schema
367    pub async fn init_schema(&self) -> anyhow::Result<()> {
368        // In production: CREATE TABLE statements for metadata tables
369        self.client.init_keyspaces().await
370    }
371}
372
373impl StorageBackend for AmatersBackend {
374    fn mailbox_store(&self) -> Arc<dyn MailboxStore> {
375        Arc::new(AmatersMailboxStore {
376            client: self.client.clone(),
377            keyspace: self.config.metadata_keyspace.clone(),
378        })
379    }
380
381    fn message_store(&self) -> Arc<dyn MessageStore> {
382        Arc::new(AmatersMessageStore {
383            client: self.client.clone(),
384            metadata_keyspace: self.config.metadata_keyspace.clone(),
385            blob_keyspace: self.config.blob_keyspace.clone(),
386        })
387    }
388
389    fn metadata_store(&self) -> Arc<dyn MetadataStore> {
390        Arc::new(AmatersMetadataStore {
391            client: self.client.clone(),
392            keyspace: self.config.metadata_keyspace.clone(),
393        })
394    }
395}
396
397/// AmateRS mailbox store
398struct AmatersMailboxStore {
399    client: Arc<AmatersClient>,
400    keyspace: String,
401}
402
403#[async_trait]
404impl MailboxStore for AmatersMailboxStore {
405    async fn create_mailbox(&self, path: &MailboxPath) -> anyhow::Result<MailboxId> {
406        let mailbox = Mailbox::new(path.clone());
407        let id = *mailbox.id();
408
409        let record = MailboxRecord {
410            id: id.to_string(),
411            username: path.user().to_string(),
412            path: path.path().to_vec(),
413            uid_validity: mailbox.uid_validity(),
414            uid_next: mailbox.uid_next(),
415            special_use: mailbox.special_use().map(|s| s.to_string()),
416            created_at: std::time::SystemTime::now()
417                .duration_since(std::time::UNIX_EPOCH)
418                .unwrap_or_default()
419                .as_secs() as i64,
420        };
421
422        let key = format!("mailbox:{}", id);
423        let value = serde_json::to_vec(&record)?;
424
425        self.client.put(&self.keyspace, key, value).await?;
426
427        // Also index by user
428        let user_key = format!("user:{}:mailbox:{}", path.user(), id);
429        self.client.put(&self.keyspace, user_key, vec![]).await?;
430
431        Ok(id)
432    }
433
434    async fn delete_mailbox(&self, id: &MailboxId) -> anyhow::Result<()> {
435        let key = format!("mailbox:{}", id);
436        self.client.delete(&self.keyspace, &key).await?;
437        Ok(())
438    }
439
440    async fn rename_mailbox(&self, id: &MailboxId, new_path: &MailboxPath) -> anyhow::Result<()> {
441        let key = format!("mailbox:{}", id);
442        let data = self
443            .client
444            .get(&self.keyspace, &key)
445            .await?
446            .ok_or_else(|| anyhow::anyhow!("Mailbox not found"))?;
447
448        let mut record: MailboxRecord = serde_json::from_slice(&data)?;
449        record.path = new_path.path().to_vec();
450
451        let value = serde_json::to_vec(&record)?;
452        self.client.put(&self.keyspace, key, value).await?;
453
454        Ok(())
455    }
456
457    async fn get_mailbox(&self, id: &MailboxId) -> anyhow::Result<Option<Mailbox>> {
458        let key = format!("mailbox:{}", id);
459        let data = match self.client.get(&self.keyspace, &key).await? {
460            Some(d) => d,
461            None => return Ok(None),
462        };
463
464        let record: MailboxRecord = serde_json::from_slice(&data)?;
465        let username = Username::new(record.username)
466            .map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
467        let path = MailboxPath::new(username, record.path);
468
469        let mut mailbox = Mailbox::new(path);
470        mailbox.set_special_use(record.special_use);
471
472        Ok(Some(mailbox))
473    }
474
475    async fn list_mailboxes(&self, user: &Username) -> anyhow::Result<Vec<Mailbox>> {
476        let prefix = format!("user:{}:mailbox:", user);
477        let keys = self.client.list_prefix(&self.keyspace, &prefix).await?;
478
479        let mut mailboxes = Vec::new();
480        for key in keys {
481            if let Some(mailbox_id_str) = key.strip_prefix(&prefix) {
482                let mailbox_key = format!("mailbox:{}", mailbox_id_str);
483                if let Ok(Some(data)) = self.client.get(&self.keyspace, &mailbox_key).await {
484                    if let Ok(record) = serde_json::from_slice::<MailboxRecord>(&data) {
485                        if let Ok(username) = Username::new(record.username) {
486                            let path = MailboxPath::new(username, record.path);
487                            let mut mailbox = Mailbox::new(path);
488                            mailbox.set_special_use(record.special_use);
489                            mailboxes.push(mailbox);
490                        }
491                    }
492                }
493            }
494        }
495
496        Ok(mailboxes)
497    }
498
499    async fn get_user_inbox(&self, user: &Username) -> anyhow::Result<Option<MailboxId>> {
500        let prefix = format!("user:{}:mailbox:", user);
501        let keys = self.client.list_prefix(&self.keyspace, &prefix).await?;
502
503        for key in keys {
504            if let Some(mailbox_id_str) = key.strip_prefix(&prefix) {
505                let mailbox_key = format!("mailbox:{}", mailbox_id_str);
506                if let Ok(Some(data)) = self.client.get(&self.keyspace, &mailbox_key).await {
507                    if let Ok(record) = serde_json::from_slice::<MailboxRecord>(&data) {
508                        let is_inbox = record.path == vec!["INBOX"]
509                            || record
510                                .special_use
511                                .as_deref()
512                                .map(|s| s.eq_ignore_ascii_case("inbox"))
513                                .unwrap_or(false);
514                        if is_inbox {
515                            if let Ok(uuid) = uuid::Uuid::parse_str(mailbox_id_str) {
516                                return Ok(Some(MailboxId::from_uuid(uuid)));
517                            }
518                        }
519                    }
520                }
521            }
522        }
523
524        Ok(None)
525    }
526
527    async fn subscribe_mailbox(&self, user: &Username, mailbox_name: String) -> anyhow::Result<()> {
528        let key = format!("subscription:{}:{}", user, mailbox_name);
529        self.client.put(&self.keyspace, key, vec![1]).await?;
530        Ok(())
531    }
532
533    async fn unsubscribe_mailbox(&self, user: &Username, mailbox_name: &str) -> anyhow::Result<()> {
534        let key = format!("subscription:{}:{}", user, mailbox_name);
535        self.client.delete(&self.keyspace, &key).await?;
536        Ok(())
537    }
538
539    async fn list_subscriptions(&self, user: &Username) -> anyhow::Result<Vec<String>> {
540        let prefix = format!("subscription:{}:", user);
541        let keys = self.client.list_prefix(&self.keyspace, &prefix).await?;
542
543        Ok(keys
544            .into_iter()
545            .filter_map(|k| k.strip_prefix(&prefix).map(|s| s.to_string()))
546            .collect())
547    }
548}
549
550/// AmateRS message store with blob separation
551struct AmatersMessageStore {
552    client: Arc<AmatersClient>,
553    metadata_keyspace: String,
554    blob_keyspace: String,
555}
556
557#[async_trait]
558impl MessageStore for AmatersMessageStore {
559    async fn append_message(
560        &self,
561        mailbox_id: &MailboxId,
562        message: Mail,
563    ) -> anyhow::Result<MessageMetadata> {
564        let message_id = *message.message_id();
565
566        // Store message blob separately
567        // In production: serialize the message body properly
568        let blob = MessageBlob {
569            message_id: message_id.to_string(),
570            body: vec![], // Placeholder - would serialize message.message()
571            compressed: false,
572        };
573
574        let blob_key = format!("blob:{}", message_id);
575        let blob_value = serde_json::to_vec(&blob)?;
576        self.client
577            .put(&self.blob_keyspace, blob_key.clone(), blob_value)
578            .await?;
579
580        // Store message metadata
581        let record = MessageRecord {
582            id: message_id.to_string(),
583            mailbox_id: mailbox_id.to_string(),
584            uid: 1, // Would need to get next UID from mailbox
585            sender: message.sender().map(|s| s.to_string()),
586            recipients: message.recipients().iter().map(|r| r.to_string()).collect(),
587            headers: HashMap::new(),
588            size: message.size(),
589            blob_key,
590            created_at: std::time::SystemTime::now()
591                .duration_since(std::time::UNIX_EPOCH)
592                .unwrap_or_default()
593                .as_secs() as i64,
594        };
595
596        let metadata_key = format!("message:{}", message_id);
597        let metadata_value = serde_json::to_vec(&record)?;
598        self.client
599            .put(&self.metadata_keyspace, metadata_key, metadata_value)
600            .await?;
601
602        // Index by mailbox
603        let mailbox_index_key = format!("mailbox:{}:message:{}", mailbox_id, message_id);
604        self.client
605            .put(&self.metadata_keyspace, mailbox_index_key, vec![])
606            .await?;
607
608        let metadata = MessageMetadata::new(
609            message_id,
610            *mailbox_id,
611            1,
612            MessageFlags::new(),
613            message.size(),
614        );
615
616        Ok(metadata)
617    }
618
619    async fn get_message(&self, _message_id: &MessageId) -> anyhow::Result<Option<Mail>> {
620        // In production: reconstruct Mail from stored metadata and blob
621        // For now, return None as this requires MimeMessage parsing
622        Ok(None)
623    }
624
625    async fn delete_messages(&self, message_ids: &[MessageId]) -> anyhow::Result<()> {
626        for message_id in message_ids {
627            let key = format!("message:{}", message_id);
628
629            // Get blob key before deleting metadata
630            if let Some(data) = self.client.get(&self.metadata_keyspace, &key).await? {
631                if let Ok(record) = serde_json::from_slice::<MessageRecord>(&data) {
632                    self.client
633                        .delete(&self.blob_keyspace, &record.blob_key)
634                        .await?;
635                }
636            }
637
638            self.client.delete(&self.metadata_keyspace, &key).await?;
639        }
640        Ok(())
641    }
642
643    async fn set_flags(
644        &self,
645        message_ids: &[MessageId],
646        _flags: MessageFlags,
647    ) -> anyhow::Result<()> {
648        // In production: update flags in metadata
649        for message_id in message_ids {
650            let key = format!("flags:{}", message_id);
651            let value = vec![1]; // Placeholder
652            self.client.put(&self.metadata_keyspace, key, value).await?;
653        }
654        Ok(())
655    }
656
657    async fn search(
658        &self,
659        mailbox_id: &MailboxId,
660        _criteria: SearchCriteria,
661    ) -> anyhow::Result<Vec<MessageId>> {
662        let prefix = format!("mailbox:{}:message:", mailbox_id);
663        let keys = self
664            .client
665            .list_prefix(&self.metadata_keyspace, &prefix)
666            .await?;
667
668        let message_ids = keys
669            .into_iter()
670            .filter_map(|k| k.strip_prefix(&prefix).map(|_id_str| MessageId::new()))
671            .collect();
672
673        Ok(message_ids)
674    }
675
676    async fn copy_messages(
677        &self,
678        message_ids: &[MessageId],
679        dest_mailbox_id: &MailboxId,
680    ) -> anyhow::Result<Vec<MessageMetadata>> {
681        let mut metadata_list = Vec::new();
682
683        for message_id in message_ids {
684            if let Some(message) = self.get_message(message_id).await? {
685                let metadata = self.append_message(dest_mailbox_id, message).await?;
686                metadata_list.push(metadata);
687            }
688        }
689
690        Ok(metadata_list)
691    }
692
693    async fn get_mailbox_messages(
694        &self,
695        mailbox_id: &MailboxId,
696    ) -> anyhow::Result<Vec<MessageMetadata>> {
697        let prefix = format!("mailbox:{}:message:", mailbox_id);
698        let keys = self
699            .client
700            .list_prefix(&self.metadata_keyspace, &prefix)
701            .await?;
702
703        let mut metadata_list = Vec::new();
704        for key in keys {
705            if key.strip_prefix(&prefix).is_some() {
706                let message_id = MessageId::new();
707                let metadata =
708                    MessageMetadata::new(message_id, *mailbox_id, 1, MessageFlags::new(), 0);
709                metadata_list.push(metadata);
710            }
711        }
712
713        Ok(metadata_list)
714    }
715}
716
717/// AmateRS metadata store
718struct AmatersMetadataStore {
719    client: Arc<AmatersClient>,
720    keyspace: String,
721}
722
723#[async_trait]
724impl MetadataStore for AmatersMetadataStore {
725    async fn get_user_quota(&self, user: &Username) -> anyhow::Result<Quota> {
726        let key = format!("quota:{}", user);
727        let data = match self.client.get(&self.keyspace, &key).await? {
728            Some(d) => d,
729            None => return Ok(Quota::new(0, 1024 * 1024 * 1024)),
730        };
731
732        Ok(serde_json::from_slice(&data)?)
733    }
734
735    async fn set_user_quota(&self, user: &Username, quota: Quota) -> anyhow::Result<()> {
736        let key = format!("quota:{}", user);
737        let value = serde_json::to_vec(&quota)?;
738        self.client.put(&self.keyspace, key, value).await?;
739        Ok(())
740    }
741
742    async fn get_mailbox_counters(
743        &self,
744        mailbox_id: &MailboxId,
745    ) -> anyhow::Result<MailboxCounters> {
746        let key = format!("counters:{}", mailbox_id);
747        let data = match self.client.get(&self.keyspace, &key).await? {
748            Some(d) => d,
749            None => return Ok(MailboxCounters::default()),
750        };
751
752        Ok(serde_json::from_slice(&data)?)
753    }
754}
755
756// Note: MailboxId UUID conversion would be implemented in production
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761
762    #[test]
763    fn test_amaters_config_default() {
764        let config = AmatersConfig::default();
765        assert_eq!(config.cluster_endpoints.len(), 1);
766        assert_eq!(config.replication_factor, 3);
767        assert_eq!(config.read_consistency, ConsistencyLevel::Quorum);
768        assert_eq!(config.write_consistency, ConsistencyLevel::Quorum);
769    }
770
771    #[test]
772    fn test_consistency_levels() {
773        assert_eq!(ConsistencyLevel::All, ConsistencyLevel::All);
774        assert_eq!(ConsistencyLevel::Quorum, ConsistencyLevel::Quorum);
775        assert_eq!(ConsistencyLevel::One, ConsistencyLevel::One);
776        assert_ne!(ConsistencyLevel::All, ConsistencyLevel::One);
777    }
778
779    #[tokio::test]
780    async fn test_amaters_client_creation() {
781        let config = AmatersConfig::default();
782        let client = AmatersClient::new(config);
783        assert!(client.connect().await.is_ok());
784    }
785
786    #[tokio::test]
787    async fn test_amaters_backend_creation() {
788        let config = AmatersConfig::default();
789        let backend = AmatersBackend::new(config).await;
790        assert!(backend.is_ok());
791    }
792
793    #[tokio::test]
794    async fn test_put_and_get() {
795        let config = AmatersConfig::default();
796        let client = AmatersClient::new(config);
797        client.connect().await.unwrap();
798
799        let key = "test_key".to_string();
800        let value = vec![1, 2, 3, 4];
801
802        client
803            .put("metadata", key.clone(), value.clone())
804            .await
805            .unwrap();
806        let retrieved = client.get("metadata", &key).await.unwrap();
807
808        assert_eq!(retrieved, Some(value));
809    }
810
811    #[tokio::test]
812    async fn test_delete() {
813        let config = AmatersConfig::default();
814        let client = AmatersClient::new(config);
815        client.connect().await.unwrap();
816
817        let key = "delete_key".to_string();
818        let value = vec![5, 6, 7, 8];
819
820        client.put("metadata", key.clone(), value).await.unwrap();
821        client.delete("metadata", &key).await.unwrap();
822
823        let retrieved = client.get("metadata", &key).await.unwrap();
824        assert_eq!(retrieved, None);
825    }
826
827    #[tokio::test]
828    async fn test_list_prefix() {
829        let config = AmatersConfig::default();
830        let client = AmatersClient::new(config);
831        client.connect().await.unwrap();
832
833        client
834            .put("metadata", "user:alice:mailbox:1".to_string(), vec![])
835            .await
836            .unwrap();
837        client
838            .put("metadata", "user:alice:mailbox:2".to_string(), vec![])
839            .await
840            .unwrap();
841        client
842            .put("metadata", "user:bob:mailbox:1".to_string(), vec![])
843            .await
844            .unwrap();
845
846        let alice_mailboxes = client.list_prefix("metadata", "user:alice:").await.unwrap();
847        assert_eq!(alice_mailboxes.len(), 2);
848    }
849
850    #[test]
851    fn test_mailbox_record_serialization() {
852        let record = MailboxRecord {
853            id: "test-id".to_string(),
854            username: "user@example.com".to_string(),
855            path: vec!["INBOX".to_string()],
856            uid_validity: 1,
857            uid_next: 1,
858            special_use: None,
859            created_at: 1234567890,
860        };
861
862        let serialized = serde_json::to_vec(&record).unwrap();
863        let deserialized: MailboxRecord = serde_json::from_slice(&serialized).unwrap();
864
865        assert_eq!(record.id, deserialized.id);
866        assert_eq!(record.username, deserialized.username);
867    }
868
869    #[test]
870    fn test_message_record_serialization() {
871        let record = MessageRecord {
872            id: "msg-id".to_string(),
873            mailbox_id: "mailbox-id".to_string(),
874            uid: 1,
875            sender: Some("sender@example.com".to_string()),
876            recipients: vec!["recipient@example.com".to_string()],
877            headers: HashMap::new(),
878            size: 1024,
879            blob_key: "blob:msg-id".to_string(),
880            created_at: 1234567890,
881        };
882
883        let serialized = serde_json::to_vec(&record).unwrap();
884        let deserialized: MessageRecord = serde_json::from_slice(&serialized).unwrap();
885
886        assert_eq!(record.id, deserialized.id);
887        assert_eq!(record.size, deserialized.size);
888    }
889
890    #[test]
891    fn test_message_blob_serialization() {
892        let blob = MessageBlob {
893            message_id: "msg-id".to_string(),
894            body: vec![1, 2, 3, 4],
895            compressed: false,
896        };
897
898        let serialized = serde_json::to_vec(&blob).unwrap();
899        let deserialized: MessageBlob = serde_json::from_slice(&serialized).unwrap();
900
901        assert_eq!(blob.message_id, deserialized.message_id);
902        assert_eq!(blob.body, deserialized.body);
903    }
904
905    #[test]
906    fn test_amaters_config_custom() {
907        let config = AmatersConfig {
908            cluster_endpoints: vec![
909                "node1.example.com:9042".to_string(),
910                "node2.example.com:9042".to_string(),
911            ],
912            replication_factor: 5,
913            read_consistency: ConsistencyLevel::LocalQuorum,
914            write_consistency: ConsistencyLevel::All,
915            ..Default::default()
916        };
917
918        assert_eq!(config.cluster_endpoints.len(), 2);
919        assert_eq!(config.replication_factor, 5);
920    }
921
922    #[test]
923    fn test_keyspace_configuration() {
924        let config = AmatersConfig {
925            metadata_keyspace: "custom_metadata".to_string(),
926            blob_keyspace: "custom_blobs".to_string(),
927            ..Default::default()
928        };
929
930        assert_eq!(config.metadata_keyspace, "custom_metadata");
931        assert_eq!(config.blob_keyspace, "custom_blobs");
932    }
933
934    #[test]
935    fn test_compression_flag() {
936        let config = AmatersConfig {
937            enable_compression: true,
938            ..Default::default()
939        };
940        assert!(config.enable_compression);
941
942        let config_no_compression = AmatersConfig {
943            enable_compression: false,
944            ..Default::default()
945        };
946        assert!(!config_no_compression.enable_compression);
947    }
948
949    #[test]
950    fn test_retry_configuration() {
951        let config = AmatersConfig {
952            max_retries: 5,
953            ..Default::default()
954        };
955        assert_eq!(config.max_retries, 5);
956    }
957
958    #[test]
959    fn test_timeout_configuration() {
960        let config = AmatersConfig {
961            timeout_ms: 30000,
962            ..Default::default()
963        };
964        assert_eq!(config.timeout_ms, 30000);
965    }
966
967    #[tokio::test]
968    async fn test_init_keyspaces() {
969        let config = AmatersConfig::default();
970        let client = AmatersClient::new(config);
971        assert!(client.init_keyspaces().await.is_ok());
972    }
973
974    #[tokio::test]
975    async fn test_blob_keyspace_separation() {
976        let config = AmatersConfig::default();
977        let client = AmatersClient::new(config);
978
979        client
980            .put("metadata", "key1".to_string(), vec![1])
981            .await
982            .unwrap();
983        client
984            .put("blobs", "key2".to_string(), vec![2])
985            .await
986            .unwrap();
987
988        let meta_val = client.get("metadata", "key1").await.unwrap();
989        let blob_val = client.get("blobs", "key2").await.unwrap();
990
991        assert_eq!(meta_val, Some(vec![1]));
992        assert_eq!(blob_val, Some(vec![2]));
993    }
994
995    #[tokio::test]
996    async fn test_multiple_contact_points() {
997        let config = AmatersConfig {
998            cluster_endpoints: vec![
999                "host1:9042".to_string(),
1000                "host2:9042".to_string(),
1001                "host3:9042".to_string(),
1002            ],
1003            ..Default::default()
1004        };
1005
1006        let client = AmatersClient::new(config);
1007        assert!(client.connect().await.is_ok());
1008    }
1009
1010    #[test]
1011    fn test_circuit_breaker_creation() {
1012        let cb = CircuitBreaker::new(5, 60000);
1013        assert_eq!(cb.threshold, 5);
1014        assert_eq!(cb.timeout_ms, 60000);
1015    }
1016
1017    #[tokio::test]
1018    async fn test_circuit_breaker_closed_initially() {
1019        let cb = CircuitBreaker::new(3, 60000);
1020        assert!(!cb.is_open().await);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_circuit_breaker_opens_after_threshold() {
1025        let cb = CircuitBreaker::new(3, 60000);
1026
1027        cb.record_failure().await;
1028        assert!(!cb.is_open().await);
1029
1030        cb.record_failure().await;
1031        assert!(!cb.is_open().await);
1032
1033        cb.record_failure().await;
1034        assert!(cb.is_open().await);
1035    }
1036
1037    #[tokio::test]
1038    async fn test_circuit_breaker_reset_on_success() {
1039        let cb = CircuitBreaker::new(3, 60000);
1040
1041        cb.record_failure().await;
1042        cb.record_failure().await;
1043        assert!(!cb.is_open().await);
1044
1045        cb.record_success().await;
1046        let count = cb.failure_count.read().await;
1047        assert_eq!(*count, 0);
1048    }
1049
1050    #[tokio::test]
1051    async fn test_circuit_breaker_half_open_after_timeout() {
1052        let cb = CircuitBreaker::new(2, 100); // 100ms timeout
1053
1054        cb.record_failure().await;
1055        cb.record_failure().await;
1056        assert!(cb.is_open().await);
1057
1058        tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1059        cb.attempt_reset().await;
1060
1061        let state = cb.state.read().await;
1062        assert!(matches!(*state, CircuitBreakerState::HalfOpen));
1063    }
1064
1065    #[tokio::test]
1066    async fn test_config_cluster_endpoints() {
1067        let config = AmatersConfig::default();
1068        assert_eq!(config.cluster_endpoints.len(), 1);
1069        assert_eq!(config.cluster_endpoints[0], "localhost:9042");
1070    }
1071
1072    #[tokio::test]
1073    async fn test_config_timeout_ms() {
1074        let config = AmatersConfig {
1075            timeout_ms: 5000,
1076            ..Default::default()
1077        };
1078        assert_eq!(config.timeout_ms, 5000);
1079    }
1080
1081    #[tokio::test]
1082    async fn test_config_circuit_breaker_settings() {
1083        let config = AmatersConfig {
1084            circuit_breaker_threshold: 10,
1085            circuit_breaker_timeout_ms: 120000,
1086            ..Default::default()
1087        };
1088        assert_eq!(config.circuit_breaker_threshold, 10);
1089        assert_eq!(config.circuit_breaker_timeout_ms, 120000);
1090    }
1091
1092    #[tokio::test]
1093    async fn test_put_records_success() {
1094        let config = AmatersConfig::default();
1095        let client = AmatersClient::new(config);
1096        client.connect().await.unwrap();
1097
1098        client
1099            .put("metadata", "key1".to_string(), vec![1, 2, 3])
1100            .await
1101            .unwrap();
1102
1103        let count = client.circuit_breaker.failure_count.read().await;
1104        assert_eq!(*count, 0);
1105    }
1106
1107    #[tokio::test]
1108    async fn test_get_nonexistent_key() {
1109        let config = AmatersConfig::default();
1110        let client = AmatersClient::new(config);
1111
1112        let result = client.get("metadata", "nonexistent").await.unwrap();
1113        assert_eq!(result, None);
1114    }
1115
1116    #[tokio::test]
1117    async fn test_delete_nonexistent_key() {
1118        let config = AmatersConfig::default();
1119        let client = AmatersClient::new(config);
1120
1121        let result = client.delete("metadata", "nonexistent").await;
1122        assert!(result.is_ok());
1123    }
1124
1125    #[tokio::test]
1126    async fn test_list_prefix_empty() {
1127        let config = AmatersConfig::default();
1128        let client = AmatersClient::new(config);
1129
1130        let keys = client.list_prefix("metadata", "empty:").await.unwrap();
1131        assert_eq!(keys.len(), 0);
1132    }
1133
1134    #[tokio::test]
1135    async fn test_blob_and_metadata_separation() {
1136        let config = AmatersConfig::default();
1137        let client = AmatersClient::new(config);
1138
1139        client
1140            .put("metadata", "key1".to_string(), vec![1])
1141            .await
1142            .unwrap();
1143        client
1144            .put("blob_keyspace", "key1".to_string(), vec![2])
1145            .await
1146            .unwrap();
1147
1148        let meta = client.get("metadata", "key1").await.unwrap();
1149        let blob = client.get("blob_keyspace", "key1").await.unwrap();
1150
1151        assert_eq!(meta, Some(vec![1]));
1152        assert_eq!(blob, Some(vec![2]));
1153    }
1154
1155    #[tokio::test]
1156    async fn test_backend_stores_creation() {
1157        let config = AmatersConfig::default();
1158        let backend = AmatersBackend::new(config).await.unwrap();
1159
1160        let _mailbox_store = backend.mailbox_store();
1161        let _message_store = backend.message_store();
1162        let _metadata_store = backend.metadata_store();
1163    }
1164
1165    #[tokio::test]
1166    async fn test_init_schema() {
1167        let config = AmatersConfig::default();
1168        let backend = AmatersBackend::new(config).await.unwrap();
1169        assert!(backend.init_schema().await.is_ok());
1170    }
1171
1172    #[test]
1173    fn test_consistency_level_all() {
1174        let level = ConsistencyLevel::All;
1175        assert_eq!(level, ConsistencyLevel::All);
1176    }
1177
1178    #[test]
1179    fn test_consistency_level_one() {
1180        let level = ConsistencyLevel::One;
1181        assert_eq!(level, ConsistencyLevel::One);
1182    }
1183
1184    #[test]
1185    fn test_consistency_level_local_quorum() {
1186        let level = ConsistencyLevel::LocalQuorum;
1187        assert_eq!(level, ConsistencyLevel::LocalQuorum);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_mailbox_subscription() {
1192        let config = AmatersConfig::default();
1193        let backend = AmatersBackend::new(config).await.unwrap();
1194        let store = backend.mailbox_store();
1195
1196        let user = Username::new("user@example.com".to_string()).unwrap();
1197        store
1198            .subscribe_mailbox(&user, "INBOX".to_string())
1199            .await
1200            .unwrap();
1201
1202        let subs = store.list_subscriptions(&user).await.unwrap();
1203        assert_eq!(subs.len(), 1);
1204        assert!(subs.contains(&"INBOX".to_string()));
1205    }
1206
1207    #[tokio::test]
1208    async fn test_mailbox_unsubscription() {
1209        let config = AmatersConfig::default();
1210        let backend = AmatersBackend::new(config).await.unwrap();
1211        let store = backend.mailbox_store();
1212
1213        let user = Username::new("user@example.com".to_string()).unwrap();
1214        store
1215            .subscribe_mailbox(&user, "INBOX".to_string())
1216            .await
1217            .unwrap();
1218        store.unsubscribe_mailbox(&user, "INBOX").await.unwrap();
1219
1220        let subs = store.list_subscriptions(&user).await.unwrap();
1221        assert_eq!(subs.len(), 0);
1222    }
1223
1224    #[tokio::test]
1225    async fn test_multiple_subscriptions() {
1226        let config = AmatersConfig::default();
1227        let backend = AmatersBackend::new(config).await.unwrap();
1228        let store = backend.mailbox_store();
1229
1230        let user = Username::new("user@example.com".to_string()).unwrap();
1231        store
1232            .subscribe_mailbox(&user, "INBOX".to_string())
1233            .await
1234            .unwrap();
1235        store
1236            .subscribe_mailbox(&user, "Sent".to_string())
1237            .await
1238            .unwrap();
1239        store
1240            .subscribe_mailbox(&user, "Drafts".to_string())
1241            .await
1242            .unwrap();
1243
1244        let subs = store.list_subscriptions(&user).await.unwrap();
1245        assert_eq!(subs.len(), 3);
1246    }
1247
1248    #[tokio::test]
1249    async fn test_quota_operations() {
1250        let config = AmatersConfig::default();
1251        let backend = AmatersBackend::new(config).await.unwrap();
1252        let store = backend.metadata_store();
1253
1254        let user = Username::new("user@example.com".to_string()).unwrap();
1255        let quota = Quota::new(1000, 10000);
1256
1257        store.set_user_quota(&user, quota).await.unwrap();
1258        let retrieved = store.get_user_quota(&user).await.unwrap();
1259
1260        assert_eq!(retrieved.used, 1000);
1261        assert_eq!(retrieved.limit, 10000);
1262    }
1263
1264    #[tokio::test]
1265    async fn test_mailbox_counters() {
1266        let config = AmatersConfig::default();
1267        let backend = AmatersBackend::new(config).await.unwrap();
1268        let store = backend.metadata_store();
1269
1270        let mailbox_id = MailboxId::new();
1271        let counters = store.get_mailbox_counters(&mailbox_id).await.unwrap();
1272
1273        assert_eq!(counters.exists, 0);
1274        assert_eq!(counters.recent, 0);
1275        assert_eq!(counters.unseen, 0);
1276    }
1277
1278    #[tokio::test]
1279    async fn test_message_blob_compression_flag() {
1280        let blob = MessageBlob {
1281            message_id: "test-id".to_string(),
1282            body: vec![1, 2, 3, 4, 5],
1283            compressed: true,
1284        };
1285
1286        assert!(blob.compressed);
1287        assert_eq!(blob.body.len(), 5);
1288    }
1289
1290    #[tokio::test]
1291    async fn test_replication_factor_config() {
1292        let config = AmatersConfig {
1293            replication_factor: 5,
1294            ..Default::default()
1295        };
1296
1297        assert_eq!(config.replication_factor, 5);
1298    }
1299
1300    #[tokio::test]
1301    async fn test_custom_keyspace_names() {
1302        let config = AmatersConfig {
1303            metadata_keyspace: "custom_meta".to_string(),
1304            blob_keyspace: "custom_blob".to_string(),
1305            ..Default::default()
1306        };
1307
1308        assert_eq!(config.metadata_keyspace, "custom_meta");
1309        assert_eq!(config.blob_keyspace, "custom_blob");
1310    }
1311
1312    #[tokio::test]
1313    async fn test_eventual_consistency_with_quorum() {
1314        let config = AmatersConfig {
1315            read_consistency: ConsistencyLevel::Quorum,
1316            write_consistency: ConsistencyLevel::Quorum,
1317            ..Default::default()
1318        };
1319
1320        assert_eq!(config.read_consistency, ConsistencyLevel::Quorum);
1321        assert_eq!(config.write_consistency, ConsistencyLevel::Quorum);
1322    }
1323
1324    #[tokio::test]
1325    async fn test_eventual_consistency_with_one() {
1326        let config = AmatersConfig {
1327            read_consistency: ConsistencyLevel::One,
1328            write_consistency: ConsistencyLevel::One,
1329            ..Default::default()
1330        };
1331
1332        assert_eq!(config.read_consistency, ConsistencyLevel::One);
1333        assert_eq!(config.write_consistency, ConsistencyLevel::One);
1334    }
1335
1336    #[tokio::test]
1337    async fn test_eventual_consistency_with_all() {
1338        let config = AmatersConfig {
1339            read_consistency: ConsistencyLevel::All,
1340            write_consistency: ConsistencyLevel::All,
1341            ..Default::default()
1342        };
1343
1344        assert_eq!(config.read_consistency, ConsistencyLevel::All);
1345        assert_eq!(config.write_consistency, ConsistencyLevel::All);
1346    }
1347
1348    #[test]
1349    fn test_message_record_with_headers() {
1350        let mut headers = HashMap::new();
1351        headers.insert("From".to_string(), "sender@example.com".to_string());
1352        headers.insert("To".to_string(), "recipient@example.com".to_string());
1353
1354        let record = MessageRecord {
1355            id: "msg-id".to_string(),
1356            mailbox_id: "mailbox-id".to_string(),
1357            uid: 1,
1358            sender: Some("sender@example.com".to_string()),
1359            recipients: vec!["recipient@example.com".to_string()],
1360            headers,
1361            size: 1024,
1362            blob_key: "blob:msg-id".to_string(),
1363            created_at: 1234567890,
1364        };
1365
1366        assert_eq!(record.headers.len(), 2);
1367        assert_eq!(
1368            record.headers.get("From"),
1369            Some(&"sender@example.com".to_string())
1370        );
1371    }
1372
1373    #[tokio::test]
1374    async fn test_failover_retry_backoff() {
1375        let config = AmatersConfig {
1376            max_retries: 3,
1377            ..Default::default()
1378        };
1379
1380        let client = AmatersClient::new(config);
1381        client.connect().await.unwrap();
1382
1383        // Put operation should succeed with retries
1384        let result = client
1385            .put("metadata", "test-key".to_string(), vec![1, 2, 3])
1386            .await;
1387        assert!(result.is_ok());
1388    }
1389}