1use crate::traits::{MailboxStore, MessageStore, MetadataStore, StorageBackend};
40use crate::types::{
41 Mailbox, MailboxCounters, MailboxId, MailboxPath, MessageFlags, MessageMetadata, Quota,
42 SearchCriteria,
43};
44use async_trait::async_trait;
45use rusmes_proto::{Mail, MessageId, Username};
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use std::sync::Arc;
49use tokio::sync::RwLock;
50
51#[derive(Debug, Clone)]
53pub struct AmatersConfig {
54 pub cluster_endpoints: Vec<String>,
56 pub metadata_keyspace: String,
58 pub blob_keyspace: String,
60 pub replication_factor: usize,
62 pub read_consistency: ConsistencyLevel,
64 pub write_consistency: ConsistencyLevel,
66 pub timeout_ms: u64,
68 pub max_retries: usize,
70 pub enable_compression: bool,
72 pub circuit_breaker_threshold: usize,
74 pub circuit_breaker_timeout_ms: u64,
76}
77
78impl Default for AmatersConfig {
79 fn default() -> Self {
80 Self {
81 cluster_endpoints: vec!["localhost:9042".to_string()],
82 metadata_keyspace: "rusmes_metadata".to_string(),
83 blob_keyspace: "rusmes_blobs".to_string(),
84 replication_factor: 3,
85 read_consistency: ConsistencyLevel::Quorum,
86 write_consistency: ConsistencyLevel::Quorum,
87 timeout_ms: 10000,
88 max_retries: 3,
89 enable_compression: true,
90 circuit_breaker_threshold: 5,
91 circuit_breaker_timeout_ms: 60000,
92 }
93 }
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ConsistencyLevel {
99 All,
101 Quorum,
103 One,
105 LocalQuorum,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111struct MailboxRecord {
112 id: String,
113 username: String,
114 path: Vec<String>,
115 uid_validity: u32,
116 uid_next: u32,
117 special_use: Option<String>,
118 created_at: i64,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123struct MessageRecord {
124 id: String,
125 mailbox_id: String,
126 uid: u32,
127 sender: Option<String>,
128 recipients: Vec<String>,
129 headers: HashMap<String, String>,
130 size: usize,
131 blob_key: String,
132 created_at: i64,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137struct MessageBlob {
138 message_id: String,
139 body: Vec<u8>,
140 compressed: bool,
141}
142
143#[derive(Debug, Clone)]
145enum CircuitBreakerState {
146 Closed,
147 Open { opened_at: std::time::Instant },
148 HalfOpen,
149}
150
151struct CircuitBreaker {
153 state: Arc<RwLock<CircuitBreakerState>>,
154 failure_count: Arc<RwLock<usize>>,
155 threshold: usize,
156 timeout_ms: u64,
157}
158
159impl CircuitBreaker {
160 fn new(threshold: usize, timeout_ms: u64) -> Self {
161 Self {
162 state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
163 failure_count: Arc::new(RwLock::new(0)),
164 threshold,
165 timeout_ms,
166 }
167 }
168
169 async fn is_open(&self) -> bool {
170 let state = self.state.read().await;
171 matches!(*state, CircuitBreakerState::Open { .. })
172 }
173
174 async fn record_success(&self) {
175 let mut count = self.failure_count.write().await;
176 *count = 0;
177 let mut state = self.state.write().await;
178 *state = CircuitBreakerState::Closed;
179 }
180
181 async fn record_failure(&self) {
182 let mut count = self.failure_count.write().await;
183 *count += 1;
184
185 if *count >= self.threshold {
186 let mut state = self.state.write().await;
187 *state = CircuitBreakerState::Open {
188 opened_at: std::time::Instant::now(),
189 };
190 }
191 }
192
193 async fn attempt_reset(&self) {
194 let state = self.state.read().await;
195 if let CircuitBreakerState::Open { opened_at } = *state {
196 if opened_at.elapsed().as_millis() as u64 >= self.timeout_ms {
197 drop(state);
198 let mut state = self.state.write().await;
199 *state = CircuitBreakerState::HalfOpen;
200 }
201 }
202 }
203}
204
205struct AmatersClient {
208 config: AmatersConfig,
209 metadata: Arc<RwLock<HashMap<String, Vec<u8>>>>,
210 blobs: Arc<RwLock<HashMap<String, Vec<u8>>>>,
211 circuit_breaker: CircuitBreaker,
212}
213
214impl AmatersClient {
215 fn new(config: AmatersConfig) -> Self {
216 let circuit_breaker = CircuitBreaker::new(
217 config.circuit_breaker_threshold,
218 config.circuit_breaker_timeout_ms,
219 );
220
221 Self {
222 config,
223 metadata: Arc::new(RwLock::new(HashMap::new())),
224 blobs: Arc::new(RwLock::new(HashMap::new())),
225 circuit_breaker,
226 }
227 }
228
229 async fn connect(&self) -> anyhow::Result<()> {
230 tracing::info!(
232 "Connecting to AmateRS cluster at {:?}",
233 self.config.cluster_endpoints
234 );
235
236 if self.circuit_breaker.is_open().await {
238 self.circuit_breaker.attempt_reset().await;
239 if self.circuit_breaker.is_open().await {
240 return Err(anyhow::anyhow!("Circuit breaker is open"));
241 }
242 }
243
244 Ok(())
245 }
246
247 async fn init_keyspaces(&self) -> anyhow::Result<()> {
248 tracing::info!(
250 "Initializing keyspaces: {} and {}",
251 self.config.metadata_keyspace,
252 self.config.blob_keyspace
253 );
254 Ok(())
255 }
256
257 async fn put(&self, keyspace: &str, key: String, value: Vec<u8>) -> anyhow::Result<()> {
258 if self.circuit_breaker.is_open().await {
260 self.circuit_breaker.attempt_reset().await;
261 if self.circuit_breaker.is_open().await {
262 return Err(anyhow::anyhow!(
263 "Circuit breaker is open, rejecting request"
264 ));
265 }
266 }
267
268 let store = if keyspace.contains("blob") {
269 &self.blobs
270 } else {
271 &self.metadata
272 };
273
274 let mut last_error = None;
276 for attempt in 0..self.config.max_retries {
277 match self.put_with_retry(store, key.clone(), value.clone()).await {
278 Ok(_) => {
279 self.circuit_breaker.record_success().await;
280 return Ok(());
281 }
282 Err(e) => {
283 tracing::warn!("Put failed (attempt {}): {}", attempt + 1, e);
284 last_error = Some(e);
285
286 if attempt < self.config.max_retries - 1 {
287 let backoff = 100 * 2_u64.pow(attempt as u32);
289 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
290 }
291 }
292 }
293 }
294
295 self.circuit_breaker.record_failure().await;
297 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Put operation failed")))
298 }
299
300 async fn put_with_retry(
301 &self,
302 store: &Arc<RwLock<HashMap<String, Vec<u8>>>>,
303 key: String,
304 value: Vec<u8>,
305 ) -> anyhow::Result<()> {
306 let mut map = store.write().await;
307 map.insert(key, value);
308 Ok(())
309 }
310
311 async fn get(&self, keyspace: &str, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
312 let store = if keyspace.contains("blob") {
313 &self.blobs
314 } else {
315 &self.metadata
316 };
317
318 let map = store.read().await;
319 Ok(map.get(key).cloned())
320 }
321
322 async fn delete(&self, keyspace: &str, key: &str) -> anyhow::Result<()> {
323 let store = if keyspace.contains("blob") {
324 &self.blobs
325 } else {
326 &self.metadata
327 };
328
329 let mut map = store.write().await;
330 map.remove(key);
331 Ok(())
332 }
333
334 async fn list_prefix(&self, keyspace: &str, prefix: &str) -> anyhow::Result<Vec<String>> {
335 let store = if keyspace.contains("blob") {
336 &self.blobs
337 } else {
338 &self.metadata
339 };
340
341 let map = store.read().await;
342 Ok(map
343 .keys()
344 .filter(|k| k.starts_with(prefix))
345 .cloned()
346 .collect())
347 }
348}
349
350pub struct AmatersBackend {
352 client: Arc<AmatersClient>,
353 config: AmatersConfig,
354}
355
356impl AmatersBackend {
357 pub async fn new(config: AmatersConfig) -> anyhow::Result<Self> {
359 let client = Arc::new(AmatersClient::new(config.clone()));
360 client.connect().await?;
361 client.init_keyspaces().await?;
362
363 Ok(Self { client, config })
364 }
365
366 pub async fn init_schema(&self) -> anyhow::Result<()> {
368 self.client.init_keyspaces().await
370 }
371}
372
373impl StorageBackend for AmatersBackend {
374 fn mailbox_store(&self) -> Arc<dyn MailboxStore> {
375 Arc::new(AmatersMailboxStore {
376 client: self.client.clone(),
377 keyspace: self.config.metadata_keyspace.clone(),
378 })
379 }
380
381 fn message_store(&self) -> Arc<dyn MessageStore> {
382 Arc::new(AmatersMessageStore {
383 client: self.client.clone(),
384 metadata_keyspace: self.config.metadata_keyspace.clone(),
385 blob_keyspace: self.config.blob_keyspace.clone(),
386 })
387 }
388
389 fn metadata_store(&self) -> Arc<dyn MetadataStore> {
390 Arc::new(AmatersMetadataStore {
391 client: self.client.clone(),
392 keyspace: self.config.metadata_keyspace.clone(),
393 })
394 }
395}
396
397struct AmatersMailboxStore {
399 client: Arc<AmatersClient>,
400 keyspace: String,
401}
402
403#[async_trait]
404impl MailboxStore for AmatersMailboxStore {
405 async fn create_mailbox(&self, path: &MailboxPath) -> anyhow::Result<MailboxId> {
406 let mailbox = Mailbox::new(path.clone());
407 let id = *mailbox.id();
408
409 let record = MailboxRecord {
410 id: id.to_string(),
411 username: path.user().to_string(),
412 path: path.path().to_vec(),
413 uid_validity: mailbox.uid_validity(),
414 uid_next: mailbox.uid_next(),
415 special_use: mailbox.special_use().map(|s| s.to_string()),
416 created_at: std::time::SystemTime::now()
417 .duration_since(std::time::UNIX_EPOCH)
418 .unwrap_or_default()
419 .as_secs() as i64,
420 };
421
422 let key = format!("mailbox:{}", id);
423 let value = serde_json::to_vec(&record)?;
424
425 self.client.put(&self.keyspace, key, value).await?;
426
427 let user_key = format!("user:{}:mailbox:{}", path.user(), id);
429 self.client.put(&self.keyspace, user_key, vec![]).await?;
430
431 Ok(id)
432 }
433
434 async fn delete_mailbox(&self, id: &MailboxId) -> anyhow::Result<()> {
435 let key = format!("mailbox:{}", id);
436 self.client.delete(&self.keyspace, &key).await?;
437 Ok(())
438 }
439
440 async fn rename_mailbox(&self, id: &MailboxId, new_path: &MailboxPath) -> anyhow::Result<()> {
441 let key = format!("mailbox:{}", id);
442 let data = self
443 .client
444 .get(&self.keyspace, &key)
445 .await?
446 .ok_or_else(|| anyhow::anyhow!("Mailbox not found"))?;
447
448 let mut record: MailboxRecord = serde_json::from_slice(&data)?;
449 record.path = new_path.path().to_vec();
450
451 let value = serde_json::to_vec(&record)?;
452 self.client.put(&self.keyspace, key, value).await?;
453
454 Ok(())
455 }
456
457 async fn get_mailbox(&self, id: &MailboxId) -> anyhow::Result<Option<Mailbox>> {
458 let key = format!("mailbox:{}", id);
459 let data = match self.client.get(&self.keyspace, &key).await? {
460 Some(d) => d,
461 None => return Ok(None),
462 };
463
464 let record: MailboxRecord = serde_json::from_slice(&data)?;
465 let username = Username::new(record.username)
466 .map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
467 let path = MailboxPath::new(username, record.path);
468
469 let mut mailbox = Mailbox::new(path);
470 mailbox.set_special_use(record.special_use);
471
472 Ok(Some(mailbox))
473 }
474
475 async fn list_mailboxes(&self, user: &Username) -> anyhow::Result<Vec<Mailbox>> {
476 let prefix = format!("user:{}:mailbox:", user);
477 let keys = self.client.list_prefix(&self.keyspace, &prefix).await?;
478
479 let mut mailboxes = Vec::new();
480 for key in keys {
481 if let Some(mailbox_id_str) = key.strip_prefix(&prefix) {
482 let mailbox_key = format!("mailbox:{}", mailbox_id_str);
483 if let Ok(Some(data)) = self.client.get(&self.keyspace, &mailbox_key).await {
484 if let Ok(record) = serde_json::from_slice::<MailboxRecord>(&data) {
485 if let Ok(username) = Username::new(record.username) {
486 let path = MailboxPath::new(username, record.path);
487 let mut mailbox = Mailbox::new(path);
488 mailbox.set_special_use(record.special_use);
489 mailboxes.push(mailbox);
490 }
491 }
492 }
493 }
494 }
495
496 Ok(mailboxes)
497 }
498
499 async fn get_user_inbox(&self, user: &Username) -> anyhow::Result<Option<MailboxId>> {
500 let prefix = format!("user:{}:mailbox:", user);
501 let keys = self.client.list_prefix(&self.keyspace, &prefix).await?;
502
503 for key in keys {
504 if let Some(mailbox_id_str) = key.strip_prefix(&prefix) {
505 let mailbox_key = format!("mailbox:{}", mailbox_id_str);
506 if let Ok(Some(data)) = self.client.get(&self.keyspace, &mailbox_key).await {
507 if let Ok(record) = serde_json::from_slice::<MailboxRecord>(&data) {
508 let is_inbox = record.path == vec!["INBOX"]
509 || record
510 .special_use
511 .as_deref()
512 .map(|s| s.eq_ignore_ascii_case("inbox"))
513 .unwrap_or(false);
514 if is_inbox {
515 if let Ok(uuid) = uuid::Uuid::parse_str(mailbox_id_str) {
516 return Ok(Some(MailboxId::from_uuid(uuid)));
517 }
518 }
519 }
520 }
521 }
522 }
523
524 Ok(None)
525 }
526
527 async fn subscribe_mailbox(&self, user: &Username, mailbox_name: String) -> anyhow::Result<()> {
528 let key = format!("subscription:{}:{}", user, mailbox_name);
529 self.client.put(&self.keyspace, key, vec![1]).await?;
530 Ok(())
531 }
532
533 async fn unsubscribe_mailbox(&self, user: &Username, mailbox_name: &str) -> anyhow::Result<()> {
534 let key = format!("subscription:{}:{}", user, mailbox_name);
535 self.client.delete(&self.keyspace, &key).await?;
536 Ok(())
537 }
538
539 async fn list_subscriptions(&self, user: &Username) -> anyhow::Result<Vec<String>> {
540 let prefix = format!("subscription:{}:", user);
541 let keys = self.client.list_prefix(&self.keyspace, &prefix).await?;
542
543 Ok(keys
544 .into_iter()
545 .filter_map(|k| k.strip_prefix(&prefix).map(|s| s.to_string()))
546 .collect())
547 }
548}
549
550struct AmatersMessageStore {
552 client: Arc<AmatersClient>,
553 metadata_keyspace: String,
554 blob_keyspace: String,
555}
556
557#[async_trait]
558impl MessageStore for AmatersMessageStore {
559 async fn append_message(
560 &self,
561 mailbox_id: &MailboxId,
562 message: Mail,
563 ) -> anyhow::Result<MessageMetadata> {
564 let message_id = *message.message_id();
565
566 let blob = MessageBlob {
569 message_id: message_id.to_string(),
570 body: vec![], compressed: false,
572 };
573
574 let blob_key = format!("blob:{}", message_id);
575 let blob_value = serde_json::to_vec(&blob)?;
576 self.client
577 .put(&self.blob_keyspace, blob_key.clone(), blob_value)
578 .await?;
579
580 let record = MessageRecord {
582 id: message_id.to_string(),
583 mailbox_id: mailbox_id.to_string(),
584 uid: 1, sender: message.sender().map(|s| s.to_string()),
586 recipients: message.recipients().iter().map(|r| r.to_string()).collect(),
587 headers: HashMap::new(),
588 size: message.size(),
589 blob_key,
590 created_at: std::time::SystemTime::now()
591 .duration_since(std::time::UNIX_EPOCH)
592 .unwrap_or_default()
593 .as_secs() as i64,
594 };
595
596 let metadata_key = format!("message:{}", message_id);
597 let metadata_value = serde_json::to_vec(&record)?;
598 self.client
599 .put(&self.metadata_keyspace, metadata_key, metadata_value)
600 .await?;
601
602 let mailbox_index_key = format!("mailbox:{}:message:{}", mailbox_id, message_id);
604 self.client
605 .put(&self.metadata_keyspace, mailbox_index_key, vec![])
606 .await?;
607
608 let metadata = MessageMetadata::new(
609 message_id,
610 *mailbox_id,
611 1,
612 MessageFlags::new(),
613 message.size(),
614 );
615
616 Ok(metadata)
617 }
618
619 async fn get_message(&self, _message_id: &MessageId) -> anyhow::Result<Option<Mail>> {
620 Ok(None)
623 }
624
625 async fn delete_messages(&self, message_ids: &[MessageId]) -> anyhow::Result<()> {
626 for message_id in message_ids {
627 let key = format!("message:{}", message_id);
628
629 if let Some(data) = self.client.get(&self.metadata_keyspace, &key).await? {
631 if let Ok(record) = serde_json::from_slice::<MessageRecord>(&data) {
632 self.client
633 .delete(&self.blob_keyspace, &record.blob_key)
634 .await?;
635 }
636 }
637
638 self.client.delete(&self.metadata_keyspace, &key).await?;
639 }
640 Ok(())
641 }
642
643 async fn set_flags(
644 &self,
645 message_ids: &[MessageId],
646 _flags: MessageFlags,
647 ) -> anyhow::Result<()> {
648 for message_id in message_ids {
650 let key = format!("flags:{}", message_id);
651 let value = vec![1]; self.client.put(&self.metadata_keyspace, key, value).await?;
653 }
654 Ok(())
655 }
656
657 async fn search(
658 &self,
659 mailbox_id: &MailboxId,
660 _criteria: SearchCriteria,
661 ) -> anyhow::Result<Vec<MessageId>> {
662 let prefix = format!("mailbox:{}:message:", mailbox_id);
663 let keys = self
664 .client
665 .list_prefix(&self.metadata_keyspace, &prefix)
666 .await?;
667
668 let message_ids = keys
669 .into_iter()
670 .filter_map(|k| k.strip_prefix(&prefix).map(|_id_str| MessageId::new()))
671 .collect();
672
673 Ok(message_ids)
674 }
675
676 async fn copy_messages(
677 &self,
678 message_ids: &[MessageId],
679 dest_mailbox_id: &MailboxId,
680 ) -> anyhow::Result<Vec<MessageMetadata>> {
681 let mut metadata_list = Vec::new();
682
683 for message_id in message_ids {
684 if let Some(message) = self.get_message(message_id).await? {
685 let metadata = self.append_message(dest_mailbox_id, message).await?;
686 metadata_list.push(metadata);
687 }
688 }
689
690 Ok(metadata_list)
691 }
692
693 async fn get_mailbox_messages(
694 &self,
695 mailbox_id: &MailboxId,
696 ) -> anyhow::Result<Vec<MessageMetadata>> {
697 let prefix = format!("mailbox:{}:message:", mailbox_id);
698 let keys = self
699 .client
700 .list_prefix(&self.metadata_keyspace, &prefix)
701 .await?;
702
703 let mut metadata_list = Vec::new();
704 for key in keys {
705 if key.strip_prefix(&prefix).is_some() {
706 let message_id = MessageId::new();
707 let metadata =
708 MessageMetadata::new(message_id, *mailbox_id, 1, MessageFlags::new(), 0);
709 metadata_list.push(metadata);
710 }
711 }
712
713 Ok(metadata_list)
714 }
715}
716
717struct AmatersMetadataStore {
719 client: Arc<AmatersClient>,
720 keyspace: String,
721}
722
723#[async_trait]
724impl MetadataStore for AmatersMetadataStore {
725 async fn get_user_quota(&self, user: &Username) -> anyhow::Result<Quota> {
726 let key = format!("quota:{}", user);
727 let data = match self.client.get(&self.keyspace, &key).await? {
728 Some(d) => d,
729 None => return Ok(Quota::new(0, 1024 * 1024 * 1024)),
730 };
731
732 Ok(serde_json::from_slice(&data)?)
733 }
734
735 async fn set_user_quota(&self, user: &Username, quota: Quota) -> anyhow::Result<()> {
736 let key = format!("quota:{}", user);
737 let value = serde_json::to_vec("a)?;
738 self.client.put(&self.keyspace, key, value).await?;
739 Ok(())
740 }
741
742 async fn get_mailbox_counters(
743 &self,
744 mailbox_id: &MailboxId,
745 ) -> anyhow::Result<MailboxCounters> {
746 let key = format!("counters:{}", mailbox_id);
747 let data = match self.client.get(&self.keyspace, &key).await? {
748 Some(d) => d,
749 None => return Ok(MailboxCounters::default()),
750 };
751
752 Ok(serde_json::from_slice(&data)?)
753 }
754}
755
756#[cfg(test)]
759mod tests {
760 use super::*;
761
762 #[test]
763 fn test_amaters_config_default() {
764 let config = AmatersConfig::default();
765 assert_eq!(config.cluster_endpoints.len(), 1);
766 assert_eq!(config.replication_factor, 3);
767 assert_eq!(config.read_consistency, ConsistencyLevel::Quorum);
768 assert_eq!(config.write_consistency, ConsistencyLevel::Quorum);
769 }
770
771 #[test]
772 fn test_consistency_levels() {
773 assert_eq!(ConsistencyLevel::All, ConsistencyLevel::All);
774 assert_eq!(ConsistencyLevel::Quorum, ConsistencyLevel::Quorum);
775 assert_eq!(ConsistencyLevel::One, ConsistencyLevel::One);
776 assert_ne!(ConsistencyLevel::All, ConsistencyLevel::One);
777 }
778
779 #[tokio::test]
780 async fn test_amaters_client_creation() {
781 let config = AmatersConfig::default();
782 let client = AmatersClient::new(config);
783 assert!(client.connect().await.is_ok());
784 }
785
786 #[tokio::test]
787 async fn test_amaters_backend_creation() {
788 let config = AmatersConfig::default();
789 let backend = AmatersBackend::new(config).await;
790 assert!(backend.is_ok());
791 }
792
793 #[tokio::test]
794 async fn test_put_and_get() {
795 let config = AmatersConfig::default();
796 let client = AmatersClient::new(config);
797 client.connect().await.unwrap();
798
799 let key = "test_key".to_string();
800 let value = vec![1, 2, 3, 4];
801
802 client
803 .put("metadata", key.clone(), value.clone())
804 .await
805 .unwrap();
806 let retrieved = client.get("metadata", &key).await.unwrap();
807
808 assert_eq!(retrieved, Some(value));
809 }
810
811 #[tokio::test]
812 async fn test_delete() {
813 let config = AmatersConfig::default();
814 let client = AmatersClient::new(config);
815 client.connect().await.unwrap();
816
817 let key = "delete_key".to_string();
818 let value = vec![5, 6, 7, 8];
819
820 client.put("metadata", key.clone(), value).await.unwrap();
821 client.delete("metadata", &key).await.unwrap();
822
823 let retrieved = client.get("metadata", &key).await.unwrap();
824 assert_eq!(retrieved, None);
825 }
826
827 #[tokio::test]
828 async fn test_list_prefix() {
829 let config = AmatersConfig::default();
830 let client = AmatersClient::new(config);
831 client.connect().await.unwrap();
832
833 client
834 .put("metadata", "user:alice:mailbox:1".to_string(), vec![])
835 .await
836 .unwrap();
837 client
838 .put("metadata", "user:alice:mailbox:2".to_string(), vec![])
839 .await
840 .unwrap();
841 client
842 .put("metadata", "user:bob:mailbox:1".to_string(), vec![])
843 .await
844 .unwrap();
845
846 let alice_mailboxes = client.list_prefix("metadata", "user:alice:").await.unwrap();
847 assert_eq!(alice_mailboxes.len(), 2);
848 }
849
850 #[test]
851 fn test_mailbox_record_serialization() {
852 let record = MailboxRecord {
853 id: "test-id".to_string(),
854 username: "user@example.com".to_string(),
855 path: vec!["INBOX".to_string()],
856 uid_validity: 1,
857 uid_next: 1,
858 special_use: None,
859 created_at: 1234567890,
860 };
861
862 let serialized = serde_json::to_vec(&record).unwrap();
863 let deserialized: MailboxRecord = serde_json::from_slice(&serialized).unwrap();
864
865 assert_eq!(record.id, deserialized.id);
866 assert_eq!(record.username, deserialized.username);
867 }
868
869 #[test]
870 fn test_message_record_serialization() {
871 let record = MessageRecord {
872 id: "msg-id".to_string(),
873 mailbox_id: "mailbox-id".to_string(),
874 uid: 1,
875 sender: Some("sender@example.com".to_string()),
876 recipients: vec!["recipient@example.com".to_string()],
877 headers: HashMap::new(),
878 size: 1024,
879 blob_key: "blob:msg-id".to_string(),
880 created_at: 1234567890,
881 };
882
883 let serialized = serde_json::to_vec(&record).unwrap();
884 let deserialized: MessageRecord = serde_json::from_slice(&serialized).unwrap();
885
886 assert_eq!(record.id, deserialized.id);
887 assert_eq!(record.size, deserialized.size);
888 }
889
890 #[test]
891 fn test_message_blob_serialization() {
892 let blob = MessageBlob {
893 message_id: "msg-id".to_string(),
894 body: vec![1, 2, 3, 4],
895 compressed: false,
896 };
897
898 let serialized = serde_json::to_vec(&blob).unwrap();
899 let deserialized: MessageBlob = serde_json::from_slice(&serialized).unwrap();
900
901 assert_eq!(blob.message_id, deserialized.message_id);
902 assert_eq!(blob.body, deserialized.body);
903 }
904
905 #[test]
906 fn test_amaters_config_custom() {
907 let config = AmatersConfig {
908 cluster_endpoints: vec![
909 "node1.example.com:9042".to_string(),
910 "node2.example.com:9042".to_string(),
911 ],
912 replication_factor: 5,
913 read_consistency: ConsistencyLevel::LocalQuorum,
914 write_consistency: ConsistencyLevel::All,
915 ..Default::default()
916 };
917
918 assert_eq!(config.cluster_endpoints.len(), 2);
919 assert_eq!(config.replication_factor, 5);
920 }
921
922 #[test]
923 fn test_keyspace_configuration() {
924 let config = AmatersConfig {
925 metadata_keyspace: "custom_metadata".to_string(),
926 blob_keyspace: "custom_blobs".to_string(),
927 ..Default::default()
928 };
929
930 assert_eq!(config.metadata_keyspace, "custom_metadata");
931 assert_eq!(config.blob_keyspace, "custom_blobs");
932 }
933
934 #[test]
935 fn test_compression_flag() {
936 let config = AmatersConfig {
937 enable_compression: true,
938 ..Default::default()
939 };
940 assert!(config.enable_compression);
941
942 let config_no_compression = AmatersConfig {
943 enable_compression: false,
944 ..Default::default()
945 };
946 assert!(!config_no_compression.enable_compression);
947 }
948
949 #[test]
950 fn test_retry_configuration() {
951 let config = AmatersConfig {
952 max_retries: 5,
953 ..Default::default()
954 };
955 assert_eq!(config.max_retries, 5);
956 }
957
958 #[test]
959 fn test_timeout_configuration() {
960 let config = AmatersConfig {
961 timeout_ms: 30000,
962 ..Default::default()
963 };
964 assert_eq!(config.timeout_ms, 30000);
965 }
966
967 #[tokio::test]
968 async fn test_init_keyspaces() {
969 let config = AmatersConfig::default();
970 let client = AmatersClient::new(config);
971 assert!(client.init_keyspaces().await.is_ok());
972 }
973
974 #[tokio::test]
975 async fn test_blob_keyspace_separation() {
976 let config = AmatersConfig::default();
977 let client = AmatersClient::new(config);
978
979 client
980 .put("metadata", "key1".to_string(), vec![1])
981 .await
982 .unwrap();
983 client
984 .put("blobs", "key2".to_string(), vec![2])
985 .await
986 .unwrap();
987
988 let meta_val = client.get("metadata", "key1").await.unwrap();
989 let blob_val = client.get("blobs", "key2").await.unwrap();
990
991 assert_eq!(meta_val, Some(vec![1]));
992 assert_eq!(blob_val, Some(vec![2]));
993 }
994
995 #[tokio::test]
996 async fn test_multiple_contact_points() {
997 let config = AmatersConfig {
998 cluster_endpoints: vec![
999 "host1:9042".to_string(),
1000 "host2:9042".to_string(),
1001 "host3:9042".to_string(),
1002 ],
1003 ..Default::default()
1004 };
1005
1006 let client = AmatersClient::new(config);
1007 assert!(client.connect().await.is_ok());
1008 }
1009
1010 #[test]
1011 fn test_circuit_breaker_creation() {
1012 let cb = CircuitBreaker::new(5, 60000);
1013 assert_eq!(cb.threshold, 5);
1014 assert_eq!(cb.timeout_ms, 60000);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_circuit_breaker_closed_initially() {
1019 let cb = CircuitBreaker::new(3, 60000);
1020 assert!(!cb.is_open().await);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_circuit_breaker_opens_after_threshold() {
1025 let cb = CircuitBreaker::new(3, 60000);
1026
1027 cb.record_failure().await;
1028 assert!(!cb.is_open().await);
1029
1030 cb.record_failure().await;
1031 assert!(!cb.is_open().await);
1032
1033 cb.record_failure().await;
1034 assert!(cb.is_open().await);
1035 }
1036
1037 #[tokio::test]
1038 async fn test_circuit_breaker_reset_on_success() {
1039 let cb = CircuitBreaker::new(3, 60000);
1040
1041 cb.record_failure().await;
1042 cb.record_failure().await;
1043 assert!(!cb.is_open().await);
1044
1045 cb.record_success().await;
1046 let count = cb.failure_count.read().await;
1047 assert_eq!(*count, 0);
1048 }
1049
1050 #[tokio::test]
1051 async fn test_circuit_breaker_half_open_after_timeout() {
1052 let cb = CircuitBreaker::new(2, 100); cb.record_failure().await;
1055 cb.record_failure().await;
1056 assert!(cb.is_open().await);
1057
1058 tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1059 cb.attempt_reset().await;
1060
1061 let state = cb.state.read().await;
1062 assert!(matches!(*state, CircuitBreakerState::HalfOpen));
1063 }
1064
1065 #[tokio::test]
1066 async fn test_config_cluster_endpoints() {
1067 let config = AmatersConfig::default();
1068 assert_eq!(config.cluster_endpoints.len(), 1);
1069 assert_eq!(config.cluster_endpoints[0], "localhost:9042");
1070 }
1071
1072 #[tokio::test]
1073 async fn test_config_timeout_ms() {
1074 let config = AmatersConfig {
1075 timeout_ms: 5000,
1076 ..Default::default()
1077 };
1078 assert_eq!(config.timeout_ms, 5000);
1079 }
1080
1081 #[tokio::test]
1082 async fn test_config_circuit_breaker_settings() {
1083 let config = AmatersConfig {
1084 circuit_breaker_threshold: 10,
1085 circuit_breaker_timeout_ms: 120000,
1086 ..Default::default()
1087 };
1088 assert_eq!(config.circuit_breaker_threshold, 10);
1089 assert_eq!(config.circuit_breaker_timeout_ms, 120000);
1090 }
1091
1092 #[tokio::test]
1093 async fn test_put_records_success() {
1094 let config = AmatersConfig::default();
1095 let client = AmatersClient::new(config);
1096 client.connect().await.unwrap();
1097
1098 client
1099 .put("metadata", "key1".to_string(), vec![1, 2, 3])
1100 .await
1101 .unwrap();
1102
1103 let count = client.circuit_breaker.failure_count.read().await;
1104 assert_eq!(*count, 0);
1105 }
1106
1107 #[tokio::test]
1108 async fn test_get_nonexistent_key() {
1109 let config = AmatersConfig::default();
1110 let client = AmatersClient::new(config);
1111
1112 let result = client.get("metadata", "nonexistent").await.unwrap();
1113 assert_eq!(result, None);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_delete_nonexistent_key() {
1118 let config = AmatersConfig::default();
1119 let client = AmatersClient::new(config);
1120
1121 let result = client.delete("metadata", "nonexistent").await;
1122 assert!(result.is_ok());
1123 }
1124
1125 #[tokio::test]
1126 async fn test_list_prefix_empty() {
1127 let config = AmatersConfig::default();
1128 let client = AmatersClient::new(config);
1129
1130 let keys = client.list_prefix("metadata", "empty:").await.unwrap();
1131 assert_eq!(keys.len(), 0);
1132 }
1133
1134 #[tokio::test]
1135 async fn test_blob_and_metadata_separation() {
1136 let config = AmatersConfig::default();
1137 let client = AmatersClient::new(config);
1138
1139 client
1140 .put("metadata", "key1".to_string(), vec![1])
1141 .await
1142 .unwrap();
1143 client
1144 .put("blob_keyspace", "key1".to_string(), vec![2])
1145 .await
1146 .unwrap();
1147
1148 let meta = client.get("metadata", "key1").await.unwrap();
1149 let blob = client.get("blob_keyspace", "key1").await.unwrap();
1150
1151 assert_eq!(meta, Some(vec![1]));
1152 assert_eq!(blob, Some(vec![2]));
1153 }
1154
1155 #[tokio::test]
1156 async fn test_backend_stores_creation() {
1157 let config = AmatersConfig::default();
1158 let backend = AmatersBackend::new(config).await.unwrap();
1159
1160 let _mailbox_store = backend.mailbox_store();
1161 let _message_store = backend.message_store();
1162 let _metadata_store = backend.metadata_store();
1163 }
1164
1165 #[tokio::test]
1166 async fn test_init_schema() {
1167 let config = AmatersConfig::default();
1168 let backend = AmatersBackend::new(config).await.unwrap();
1169 assert!(backend.init_schema().await.is_ok());
1170 }
1171
1172 #[test]
1173 fn test_consistency_level_all() {
1174 let level = ConsistencyLevel::All;
1175 assert_eq!(level, ConsistencyLevel::All);
1176 }
1177
1178 #[test]
1179 fn test_consistency_level_one() {
1180 let level = ConsistencyLevel::One;
1181 assert_eq!(level, ConsistencyLevel::One);
1182 }
1183
1184 #[test]
1185 fn test_consistency_level_local_quorum() {
1186 let level = ConsistencyLevel::LocalQuorum;
1187 assert_eq!(level, ConsistencyLevel::LocalQuorum);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_mailbox_subscription() {
1192 let config = AmatersConfig::default();
1193 let backend = AmatersBackend::new(config).await.unwrap();
1194 let store = backend.mailbox_store();
1195
1196 let user = Username::new("user@example.com".to_string()).unwrap();
1197 store
1198 .subscribe_mailbox(&user, "INBOX".to_string())
1199 .await
1200 .unwrap();
1201
1202 let subs = store.list_subscriptions(&user).await.unwrap();
1203 assert_eq!(subs.len(), 1);
1204 assert!(subs.contains(&"INBOX".to_string()));
1205 }
1206
1207 #[tokio::test]
1208 async fn test_mailbox_unsubscription() {
1209 let config = AmatersConfig::default();
1210 let backend = AmatersBackend::new(config).await.unwrap();
1211 let store = backend.mailbox_store();
1212
1213 let user = Username::new("user@example.com".to_string()).unwrap();
1214 store
1215 .subscribe_mailbox(&user, "INBOX".to_string())
1216 .await
1217 .unwrap();
1218 store.unsubscribe_mailbox(&user, "INBOX").await.unwrap();
1219
1220 let subs = store.list_subscriptions(&user).await.unwrap();
1221 assert_eq!(subs.len(), 0);
1222 }
1223
1224 #[tokio::test]
1225 async fn test_multiple_subscriptions() {
1226 let config = AmatersConfig::default();
1227 let backend = AmatersBackend::new(config).await.unwrap();
1228 let store = backend.mailbox_store();
1229
1230 let user = Username::new("user@example.com".to_string()).unwrap();
1231 store
1232 .subscribe_mailbox(&user, "INBOX".to_string())
1233 .await
1234 .unwrap();
1235 store
1236 .subscribe_mailbox(&user, "Sent".to_string())
1237 .await
1238 .unwrap();
1239 store
1240 .subscribe_mailbox(&user, "Drafts".to_string())
1241 .await
1242 .unwrap();
1243
1244 let subs = store.list_subscriptions(&user).await.unwrap();
1245 assert_eq!(subs.len(), 3);
1246 }
1247
1248 #[tokio::test]
1249 async fn test_quota_operations() {
1250 let config = AmatersConfig::default();
1251 let backend = AmatersBackend::new(config).await.unwrap();
1252 let store = backend.metadata_store();
1253
1254 let user = Username::new("user@example.com".to_string()).unwrap();
1255 let quota = Quota::new(1000, 10000);
1256
1257 store.set_user_quota(&user, quota).await.unwrap();
1258 let retrieved = store.get_user_quota(&user).await.unwrap();
1259
1260 assert_eq!(retrieved.used, 1000);
1261 assert_eq!(retrieved.limit, 10000);
1262 }
1263
1264 #[tokio::test]
1265 async fn test_mailbox_counters() {
1266 let config = AmatersConfig::default();
1267 let backend = AmatersBackend::new(config).await.unwrap();
1268 let store = backend.metadata_store();
1269
1270 let mailbox_id = MailboxId::new();
1271 let counters = store.get_mailbox_counters(&mailbox_id).await.unwrap();
1272
1273 assert_eq!(counters.exists, 0);
1274 assert_eq!(counters.recent, 0);
1275 assert_eq!(counters.unseen, 0);
1276 }
1277
1278 #[tokio::test]
1279 async fn test_message_blob_compression_flag() {
1280 let blob = MessageBlob {
1281 message_id: "test-id".to_string(),
1282 body: vec![1, 2, 3, 4, 5],
1283 compressed: true,
1284 };
1285
1286 assert!(blob.compressed);
1287 assert_eq!(blob.body.len(), 5);
1288 }
1289
1290 #[tokio::test]
1291 async fn test_replication_factor_config() {
1292 let config = AmatersConfig {
1293 replication_factor: 5,
1294 ..Default::default()
1295 };
1296
1297 assert_eq!(config.replication_factor, 5);
1298 }
1299
1300 #[tokio::test]
1301 async fn test_custom_keyspace_names() {
1302 let config = AmatersConfig {
1303 metadata_keyspace: "custom_meta".to_string(),
1304 blob_keyspace: "custom_blob".to_string(),
1305 ..Default::default()
1306 };
1307
1308 assert_eq!(config.metadata_keyspace, "custom_meta");
1309 assert_eq!(config.blob_keyspace, "custom_blob");
1310 }
1311
1312 #[tokio::test]
1313 async fn test_eventual_consistency_with_quorum() {
1314 let config = AmatersConfig {
1315 read_consistency: ConsistencyLevel::Quorum,
1316 write_consistency: ConsistencyLevel::Quorum,
1317 ..Default::default()
1318 };
1319
1320 assert_eq!(config.read_consistency, ConsistencyLevel::Quorum);
1321 assert_eq!(config.write_consistency, ConsistencyLevel::Quorum);
1322 }
1323
1324 #[tokio::test]
1325 async fn test_eventual_consistency_with_one() {
1326 let config = AmatersConfig {
1327 read_consistency: ConsistencyLevel::One,
1328 write_consistency: ConsistencyLevel::One,
1329 ..Default::default()
1330 };
1331
1332 assert_eq!(config.read_consistency, ConsistencyLevel::One);
1333 assert_eq!(config.write_consistency, ConsistencyLevel::One);
1334 }
1335
1336 #[tokio::test]
1337 async fn test_eventual_consistency_with_all() {
1338 let config = AmatersConfig {
1339 read_consistency: ConsistencyLevel::All,
1340 write_consistency: ConsistencyLevel::All,
1341 ..Default::default()
1342 };
1343
1344 assert_eq!(config.read_consistency, ConsistencyLevel::All);
1345 assert_eq!(config.write_consistency, ConsistencyLevel::All);
1346 }
1347
1348 #[test]
1349 fn test_message_record_with_headers() {
1350 let mut headers = HashMap::new();
1351 headers.insert("From".to_string(), "sender@example.com".to_string());
1352 headers.insert("To".to_string(), "recipient@example.com".to_string());
1353
1354 let record = MessageRecord {
1355 id: "msg-id".to_string(),
1356 mailbox_id: "mailbox-id".to_string(),
1357 uid: 1,
1358 sender: Some("sender@example.com".to_string()),
1359 recipients: vec!["recipient@example.com".to_string()],
1360 headers,
1361 size: 1024,
1362 blob_key: "blob:msg-id".to_string(),
1363 created_at: 1234567890,
1364 };
1365
1366 assert_eq!(record.headers.len(), 2);
1367 assert_eq!(
1368 record.headers.get("From"),
1369 Some(&"sender@example.com".to_string())
1370 );
1371 }
1372
1373 #[tokio::test]
1374 async fn test_failover_retry_backoff() {
1375 let config = AmatersConfig {
1376 max_retries: 3,
1377 ..Default::default()
1378 };
1379
1380 let client = AmatersClient::new(config);
1381 client.connect().await.unwrap();
1382
1383 let result = client
1385 .put("metadata", "test-key".to_string(), vec![1, 2, 3])
1386 .await;
1387 assert!(result.is_ok());
1388 }
1389}