enigma_relay/
store.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD;
5use base64::Engine;
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9use crate::config::RelayLimits;
10use crate::error::{EnigmaRelayError, Result};
11use crate::model::MessageMeta;
12
13pub type DynRelayStore = Arc<dyn RelayStore + Send + Sync>;
14
15#[derive(Clone, Debug)]
16pub struct InboundMessage {
17    pub recipient: String,
18    pub message_id: Uuid,
19    pub ciphertext_b64: String,
20    pub meta: MessageMeta,
21    pub payload_bytes: u64,
22    pub arrival_ms: u64,
23    pub deadline_ms: u64,
24}
25
26#[derive(Clone, Serialize, Deserialize, Debug)]
27pub struct QueueMessage {
28    pub recipient: String,
29    pub message_id: Uuid,
30    pub ciphertext_b64: String,
31    pub meta: MessageMeta,
32    pub arrival_ms: u64,
33    pub deadline_ms: u64,
34    pub payload_bytes: u64,
35    pub arrival_seq: u64,
36}
37
38#[derive(Clone, Debug)]
39pub struct PushResult {
40    pub stored: bool,
41    pub duplicate: bool,
42    pub queue_len: u64,
43    pub queue_bytes: u64,
44}
45
46#[derive(Clone, Debug)]
47pub struct PullBatch {
48    pub items: Vec<QueueMessage>,
49    pub next_cursor: Option<String>,
50    pub remaining_estimate: u64,
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct AckOutcome {
55    pub deleted: u64,
56    pub missing: u64,
57    pub remaining: u64,
58}
59
60#[derive(Clone, Debug, Eq, PartialEq, Hash)]
61pub struct AckItem {
62    pub message_id: Uuid,
63    pub chunk_index: u32,
64}
65
66#[async_trait]
67pub trait RelayStore: Send + Sync {
68    async fn push(&self, msg: InboundMessage, limits: &RelayLimits) -> Result<PushResult>;
69    async fn pull(
70        &self,
71        recipient: &str,
72        cursor: Option<String>,
73        limit: u64,
74        now_ms: u64,
75    ) -> Result<PullBatch>;
76    async fn ack(&self, recipient: &str, items: Vec<AckItem>) -> Result<AckOutcome>;
77    async fn purge_expired(&self, now_ms: u64, max: usize) -> Result<usize>;
78}
79
80pub fn encode_cursor(recipient: &str, arrival_seq: u64) -> String {
81    let raw = format!("{}:{:020}", recipient, arrival_seq);
82    STANDARD.encode(raw.as_bytes())
83}
84
85pub fn decode_cursor(cursor: &str) -> Result<(String, u64)> {
86    let decoded = STANDARD
87        .decode(cursor.as_bytes())
88        .map_err(|_| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
89    let text = String::from_utf8(decoded)
90        .map_err(|_| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
91    let mut parts = text.splitn(2, ':');
92    let recipient = parts
93        .next()
94        .ok_or_else(|| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
95    let seq = parts
96        .next()
97        .ok_or_else(|| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
98    let arrival_seq = seq
99        .parse::<u64>()
100        .map_err(|_| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
101    Ok((recipient.to_string(), arrival_seq))
102}