1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD;
5use base64::Engine;
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9use crate::config::RelayLimits;
10use crate::error::{EnigmaRelayError, Result};
11use crate::model::MessageMeta;
12
13pub type DynRelayStore = Arc<dyn RelayStore + Send + Sync>;
14
15#[derive(Clone, Debug)]
16pub struct InboundMessage {
17 pub recipient: String,
18 pub message_id: Uuid,
19 pub ciphertext_b64: String,
20 pub meta: MessageMeta,
21 pub payload_bytes: u64,
22 pub arrival_ms: u64,
23 pub deadline_ms: u64,
24}
25
26#[derive(Clone, Serialize, Deserialize, Debug)]
27pub struct QueueMessage {
28 pub recipient: String,
29 pub message_id: Uuid,
30 pub ciphertext_b64: String,
31 pub meta: MessageMeta,
32 pub arrival_ms: u64,
33 pub deadline_ms: u64,
34 pub payload_bytes: u64,
35 pub arrival_seq: u64,
36}
37
38#[derive(Clone, Debug)]
39pub struct PushResult {
40 pub stored: bool,
41 pub duplicate: bool,
42 pub queue_len: u64,
43 pub queue_bytes: u64,
44}
45
46#[derive(Clone, Debug)]
47pub struct PullBatch {
48 pub items: Vec<QueueMessage>,
49 pub next_cursor: Option<String>,
50 pub remaining_estimate: u64,
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct AckOutcome {
55 pub deleted: u64,
56 pub missing: u64,
57 pub remaining: u64,
58}
59
60#[derive(Clone, Debug, Eq, PartialEq, Hash)]
61pub struct AckItem {
62 pub message_id: Uuid,
63 pub chunk_index: u32,
64}
65
66#[async_trait]
67pub trait RelayStore: Send + Sync {
68 async fn push(&self, msg: InboundMessage, limits: &RelayLimits) -> Result<PushResult>;
69 async fn pull(
70 &self,
71 recipient: &str,
72 cursor: Option<String>,
73 limit: u64,
74 now_ms: u64,
75 ) -> Result<PullBatch>;
76 async fn ack(&self, recipient: &str, items: Vec<AckItem>) -> Result<AckOutcome>;
77 async fn purge_expired(&self, now_ms: u64, max: usize) -> Result<usize>;
78}
79
80pub fn encode_cursor(recipient: &str, arrival_seq: u64) -> String {
81 let raw = format!("{}:{:020}", recipient, arrival_seq);
82 STANDARD.encode(raw.as_bytes())
83}
84
85pub fn decode_cursor(cursor: &str) -> Result<(String, u64)> {
86 let decoded = STANDARD
87 .decode(cursor.as_bytes())
88 .map_err(|_| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
89 let text = String::from_utf8(decoded)
90 .map_err(|_| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
91 let mut parts = text.splitn(2, ':');
92 let recipient = parts
93 .next()
94 .ok_or_else(|| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
95 let seq = parts
96 .next()
97 .ok_or_else(|| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
98 let arrival_seq = seq
99 .parse::<u64>()
100 .map_err(|_| EnigmaRelayError::InvalidInput("invalid cursor".to_string()))?;
101 Ok((recipient.to_string(), arrival_seq))
102}