commucat_storage/
lib.rs

1use chrono::{DateTime, Duration, Utc};
2use rand::{RngCore, rngs::OsRng};
3use serde_json::Value;
4use std::convert::TryInto;
5use std::error::Error;
6use std::fmt::{Display, Formatter};
7use std::str::FromStr;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11use tokio_postgres::{Client, NoTls};
12
13const INIT_SQL: &str = include_str!("../migrations/001_init.sql");
14const PAIRING_SQL: &str = include_str!("../migrations/002_pairing.sql");
15const USER_BLOB_SQL: &str = include_str!("../migrations/003_user_blob.sql");
16const SERVER_SECRET_SQL: &str = include_str!("../migrations/004_server_secrets.sql");
17const DEVICE_ROTATION_SQL: &str = include_str!("../migrations/005_device_rotation.sql");
18const FEDERATION_OUTBOX_SQL: &str = include_str!("../migrations/006_federation_outbox.sql");
19const FRIEND_REQUESTS_SQL: &str = include_str!("../migrations/007_friend_requests.sql");
20const FEDERATED_IDENTIFIERS_SQL: &str = include_str!("../migrations/008_federated_identifiers.sql");
21const DEVICE_PQ_SQL: &str = include_str!("../migrations/009_device_pq.sql");
22const PAIRING_MAX_ATTEMPTS: i32 = 5;
23const PAIRING_CODE_LENGTH: usize = 8;
24const PAIRING_ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
25
26#[derive(Debug)]
27pub enum StorageError {
28    Postgres,
29    Redis,
30    Serialization,
31    Missing,
32    Invalid,
33}
34
35impl Display for StorageError {
36    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Self::Postgres => write!(f, "postgres failure"),
39            Self::Redis => write!(f, "redis failure"),
40            Self::Serialization => write!(f, "serialization failure"),
41            Self::Missing => write!(f, "missing record"),
42            Self::Invalid => write!(f, "invalid state"),
43        }
44    }
45}
46
47impl Error for StorageError {}
48
49pub struct Storage {
50    client: Client,
51    _pg_task: JoinHandle<()>,
52    redis: Arc<Mutex<redis::aio::MultiplexedConnection>>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct NewUserProfile {
57    pub user_id: String,
58    pub handle: String,
59    pub domain: String,
60    pub display_name: Option<String>,
61    pub avatar_url: Option<String>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct UserProfile {
66    pub user_id: String,
67    pub handle: String,
68    pub domain: String,
69    pub display_name: Option<String>,
70    pub avatar_url: Option<String>,
71    pub created_at: DateTime<Utc>,
72    pub updated_at: DateTime<Utc>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct DeviceRecord {
77    pub device_id: String,
78    pub user_id: String,
79    pub public_key: Vec<u8>,
80    pub status: String,
81    pub created_at: DateTime<Utc>,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct DevicePqKeys {
86    pub device_id: String,
87    pub kem_public: Vec<u8>,
88    pub signature_public: Vec<u8>,
89    pub updated_at: DateTime<Utc>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct SessionRecord {
94    pub session_id: String,
95    pub user_id: String,
96    pub device_id: String,
97    pub tls_fingerprint: String,
98    pub created_at: DateTime<Utc>,
99    pub ttl_seconds: i64,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ServerSecretRecord {
104    pub name: String,
105    pub version: i64,
106    pub secret: Vec<u8>,
107    pub public: Option<Vec<u8>>,
108    pub metadata: Value,
109    pub created_at: DateTime<Utc>,
110    pub valid_after: DateTime<Utc>,
111    pub rotates_at: DateTime<Utc>,
112    pub expires_at: DateTime<Utc>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct RelayEnvelope {
117    pub envelope_id: String,
118    pub channel_id: String,
119    pub payload: Vec<u8>,
120    pub deliver_after: DateTime<Utc>,
121    pub expires_at: DateTime<Utc>,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct IdempotencyKey {
126    pub key: String,
127    pub scope: String,
128    pub created_at: DateTime<Utc>,
129}
130
131#[derive(Debug, Clone)]
132pub struct FederationOutboxInsert<'a> {
133    pub outbox_id: &'a str,
134    pub destination: &'a str,
135    pub endpoint: &'a str,
136    pub payload: &'a serde_json::Value,
137    pub public_key: &'a [u8; 32],
138    pub next_attempt_at: DateTime<Utc>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct FederationOutboxMessage {
143    pub outbox_id: String,
144    pub destination: String,
145    pub endpoint: String,
146    pub payload: serde_json::Value,
147    pub public_key: [u8; 32],
148    pub attempts: i32,
149    pub next_attempt_at: DateTime<Utc>,
150    pub last_error: Option<String>,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
154pub struct PresenceSnapshot {
155    pub entity: String,
156    pub state: String,
157    pub expires_at: DateTime<Utc>,
158    pub user_id: Option<String>,
159    pub handle: Option<String>,
160    pub display_name: Option<String>,
161    pub avatar_url: Option<String>,
162}
163
164fn presence_user_payload(snapshot: &PresenceSnapshot) -> Option<serde_json::Value> {
165    snapshot.user_id.as_ref().map(|id| {
166        let user_id = id.clone();
167        serde_json::json!({
168            "id": user_id.clone(),
169            "user_id": user_id,
170            "handle": snapshot.handle.clone(),
171            "display_name": snapshot.display_name.clone(),
172            "avatar_url": snapshot.avatar_url.clone(),
173        })
174    })
175}
176
177fn presence_user_fields(
178    map: &serde_json::Map<String, serde_json::Value>,
179) -> (
180    Option<String>,
181    Option<String>,
182    Option<String>,
183    Option<String>,
184) {
185    let user_id = map
186        .get("user_id")
187        .or_else(|| map.get("id"))
188        .and_then(|v| v.as_str())
189        .map(|v| v.to_string());
190    let handle = map
191        .get("handle")
192        .and_then(|v| v.as_str())
193        .map(|v| v.to_string());
194    let display_name = map
195        .get("display_name")
196        .and_then(|v| v.as_str())
197        .map(|v| v.to_string());
198    let avatar_url = map
199        .get("avatar_url")
200        .and_then(|v| v.as_str())
201        .map(|v| v.to_string());
202    (user_id, handle, display_name, avatar_url)
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DeviceKeyEvent {
207    pub event_id: String,
208    pub device_id: String,
209    pub public_key: Vec<u8>,
210    pub recorded_at: DateTime<Utc>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct FriendRequest {
215    pub id: String,
216    pub from_user_id: String,
217    pub to_user_id: String,
218    pub status: String, // 'pending', 'accepted', 'rejected'
219    pub message: Option<String>,
220    pub created_at: DateTime<Utc>,
221    pub updated_at: DateTime<Utc>,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct DeviceRotationAudit {
226    pub rotation_id: String,
227    pub device_id: String,
228    pub old_public_key: Vec<u8>,
229    pub new_public_key: Vec<u8>,
230    pub signature: Vec<u8>,
231    pub nonce: Option<Vec<u8>>,
232    pub proof_expires_at: DateTime<Utc>,
233    pub applied_at: DateTime<Utc>,
234}
235
236#[derive(Debug, Clone)]
237pub struct DeviceRotationRecord<'a> {
238    pub rotation_id: &'a str,
239    pub device_id: &'a str,
240    pub user_id: &'a str,
241    pub old_public_key: &'a [u8],
242    pub new_public_key: &'a [u8],
243    pub signature: &'a [u8],
244    pub nonce: Option<&'a [u8]>,
245    pub proof_expires_at: DateTime<Utc>,
246    pub applied_at: DateTime<Utc>,
247    pub event_id: &'a str,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct PairingTokenIssued {
252    pub pair_code: String,
253    pub issued_at: DateTime<Utc>,
254    pub expires_at: DateTime<Utc>,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq)]
258pub struct PairingClaimResult {
259    pub user: UserProfile,
260    pub issuer_device_id: String,
261}
262
263/// Remote user profile cached from another server
264#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct RemoteUserProfile {
266    pub user_id: String,
267    pub domain: String,
268    pub handle: String,
269    pub display_name: Option<String>,
270    pub avatar_url: Option<String>,
271    pub profile_data: serde_json::Value,
272    pub cached_at: DateTime<Utc>,
273    pub expires_at: DateTime<Utc>,
274    pub last_fetched_at: DateTime<Utc>,
275}
276
277/// Federation peer connection status tracking
278#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct FederationPeerConnectionStatus {
280    pub domain: String,
281    pub endpoint: String,
282    pub public_key: Vec<u8>,
283    pub last_seen_at: Option<DateTime<Utc>>,
284    pub last_error: Option<String>,
285    pub error_count: i32,
286    pub status: String, // unknown, online, offline, unreachable
287    pub created_at: DateTime<Utc>,
288    pub updated_at: DateTime<Utc>,
289}
290
291/// Federated friend request (cross-server)
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct FederatedFriendRequest {
294    pub request_id: String,
295    pub from_user_id: String, // full ID: user@domain
296    pub to_user_id: String,   // full ID: user@domain
297    pub from_domain: String,
298    pub to_domain: String,
299    pub message: Option<String>,
300    pub status: String, // pending, accepted, rejected, cancelled
301    pub federation_event_id: Option<String>,
302    pub created_at: DateTime<Utc>,
303    pub updated_at: DateTime<Utc>,
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct ChatGroup {
308    pub group_id: String,
309    pub owner_device: String,
310    pub created_at: DateTime<Utc>,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub enum GroupRole {
315    Owner,
316    Admin,
317    Member,
318}
319
320impl GroupRole {
321    fn as_str(&self) -> &'static str {
322        match self {
323            GroupRole::Owner => "owner",
324            GroupRole::Admin => "admin",
325            GroupRole::Member => "member",
326        }
327    }
328}
329
330impl FromStr for GroupRole {
331    type Err = StorageError;
332
333    fn from_str(value: &str) -> Result<Self, Self::Err> {
334        match value {
335            "owner" => Ok(GroupRole::Owner),
336            "admin" => Ok(GroupRole::Admin),
337            "member" => Ok(GroupRole::Member),
338            _ => Err(StorageError::Serialization),
339        }
340    }
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct GroupMember {
345    pub group_id: String,
346    pub device_id: String,
347    pub role: GroupRole,
348    pub joined_at: DateTime<Utc>,
349}
350
351#[derive(Debug, Clone, PartialEq, Eq)]
352pub enum FederationPeerStatus {
353    Active,
354    Pending,
355    Blocked,
356}
357
358impl FederationPeerStatus {
359    fn as_str(&self) -> &'static str {
360        match self {
361            FederationPeerStatus::Active => "active",
362            FederationPeerStatus::Pending => "pending",
363            FederationPeerStatus::Blocked => "blocked",
364        }
365    }
366}
367
368impl FromStr for FederationPeerStatus {
369    type Err = StorageError;
370
371    fn from_str(value: &str) -> Result<Self, Self::Err> {
372        match value {
373            "active" => Ok(FederationPeerStatus::Active),
374            "pending" => Ok(FederationPeerStatus::Pending),
375            "blocked" => Ok(FederationPeerStatus::Blocked),
376            _ => Err(StorageError::Serialization),
377        }
378    }
379}
380
381#[derive(Debug, Clone, PartialEq, Eq)]
382pub struct FederationPeerRecord {
383    pub domain: String,
384    pub endpoint: String,
385    pub public_key: [u8; 32],
386    pub status: FederationPeerStatus,
387    pub updated_at: DateTime<Utc>,
388}
389
390#[derive(Debug, Clone, PartialEq, Eq)]
391pub struct InboxOffset {
392    pub entity_id: String,
393    pub channel_id: String,
394    pub last_envelope_id: Option<String>,
395    pub updated_at: DateTime<Utc>,
396}
397
398/// Establishes connectivity to PostgreSQL and Redis backends.
399pub async fn connect(postgres_dsn: &str, redis_url: &str) -> Result<Storage, StorageError> {
400    let (client, connection) = tokio_postgres::connect(postgres_dsn, NoTls)
401        .await
402        .map_err(|_| StorageError::Postgres)?;
403    let task = tokio::spawn(async move {
404        if let Err(error) = connection.await {
405            tracing::error!("postgres connection stopped: {}", error);
406        }
407    });
408    let redis_client = redis::Client::open(redis_url).map_err(|_| StorageError::Redis)?;
409    let redis_connection = redis_client
410        .get_multiplexed_async_connection()
411        .await
412        .map_err(|_| StorageError::Redis)?;
413    Ok(Storage {
414        client,
415        _pg_task: task,
416        redis: Arc::new(Mutex::new(redis_connection)),
417    })
418}
419
420impl Storage {
421    /// Applies bundled migrations to PostgreSQL.
422    pub async fn migrate(&self) -> Result<(), StorageError> {
423        self.client
424            .batch_execute(INIT_SQL)
425            .await
426            .map_err(|_| StorageError::Postgres)?;
427        self.client
428            .batch_execute(PAIRING_SQL)
429            .await
430            .map_err(|_| StorageError::Postgres)?;
431        self.client
432            .batch_execute(USER_BLOB_SQL)
433            .await
434            .map_err(|_| StorageError::Postgres)?;
435        self.client
436            .batch_execute(SERVER_SECRET_SQL)
437            .await
438            .map_err(|_| StorageError::Postgres)?;
439        self.client
440            .batch_execute(DEVICE_ROTATION_SQL)
441            .await
442            .map_err(|_| StorageError::Postgres)?;
443        self.client
444            .batch_execute(FEDERATION_OUTBOX_SQL)
445            .await
446            .map_err(|_| StorageError::Postgres)?;
447        self.client
448            .batch_execute(FRIEND_REQUESTS_SQL)
449            .await
450            .map_err(|_| StorageError::Postgres)?;
451        self.client
452            .batch_execute(FEDERATED_IDENTIFIERS_SQL)
453            .await
454            .map_err(|_| StorageError::Postgres)?;
455        self.client
456            .batch_execute(DEVICE_PQ_SQL)
457            .await
458            .map_err(|_| StorageError::Postgres)?;
459        Ok(())
460    }
461
462    /// Executes lightweight probes across PostgreSQL and Redis.
463    pub async fn readiness(&self) -> Result<(), StorageError> {
464        self.client
465            .simple_query("SELECT 1")
466            .await
467            .map_err(|_| StorageError::Postgres)?;
468        let mut conn = self.redis.lock().await;
469        let _: String = redis::cmd("PING")
470            .query_async::<String>(&mut *conn)
471            .await
472            .map_err(|_| StorageError::Redis)?;
473        Ok(())
474    }
475
476    /// Reads a user-scoped blob entry.
477    pub async fn read_user_blob(
478        &self,
479        user_id: &str,
480        key: &str,
481    ) -> Result<Option<String>, StorageError> {
482        let row = self
483            .client
484            .query_opt(
485                "SELECT payload FROM user_blob WHERE user_id = $1 AND key = $2",
486                &[&user_id, &key],
487            )
488            .await
489            .map_err(|_| StorageError::Postgres)?;
490        Ok(row.map(|row| row.get(0)))
491    }
492
493    /// Upserts a user-scoped blob entry.
494    pub async fn write_user_blob(
495        &self,
496        user_id: &str,
497        key: &str,
498        payload: &str,
499    ) -> Result<(), StorageError> {
500        let now = Utc::now();
501        self.client
502            .execute(
503                "INSERT INTO user_blob (user_id, key, payload, updated_at) VALUES ($1, $2, $3, $4)
504                ON CONFLICT (user_id, key) DO UPDATE SET payload = excluded.payload, updated_at = excluded.updated_at",
505                &[&user_id, &key, &payload, &now],
506            )
507            .await
508            .map_err(|_| StorageError::Postgres)?;
509        Ok(())
510    }
511
512    /// Inserts a server-scoped secret version.
513    pub async fn insert_server_secret(
514        &self,
515        record: &ServerSecretRecord,
516    ) -> Result<(), StorageError> {
517        let query = "INSERT INTO server_secret (name, version, secret, public, metadata, created_at, valid_after, rotates_at, expires_at)
518            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
519        self.client
520            .execute(
521                query,
522                &[
523                    &record.name,
524                    &record.version,
525                    &record.secret,
526                    &record.public,
527                    &record.metadata,
528                    &record.created_at,
529                    &record.valid_after,
530                    &record.rotates_at,
531                    &record.expires_at,
532                ],
533            )
534            .await
535            .map_err(|_| StorageError::Postgres)?;
536        Ok(())
537    }
538
539    /// Loads active secret material for a given name.
540    pub async fn active_server_secrets(
541        &self,
542        name: &str,
543        moment: DateTime<Utc>,
544    ) -> Result<Vec<ServerSecretRecord>, StorageError> {
545        let query = "SELECT name, version, secret, public, metadata, created_at, valid_after, rotates_at, expires_at
546            FROM server_secret
547            WHERE name = $1 AND valid_after <= $2 AND expires_at > $2
548            ORDER BY version DESC";
549        let rows = self
550            .client
551            .query(query, &[&name, &moment])
552            .await
553            .map_err(|_| StorageError::Postgres)?;
554        Ok(rows
555            .into_iter()
556            .map(|row| ServerSecretRecord {
557                name: row.get(0),
558                version: row.get(1),
559                secret: row.get(2),
560                public: row.get(3),
561                metadata: row.get(4),
562                created_at: row.get(5),
563                valid_after: row.get(6),
564                rotates_at: row.get(7),
565                expires_at: row.get(8),
566            })
567            .collect())
568    }
569
570    /// Returns the most recent version number for a secret name.
571    pub async fn latest_server_secret_version(&self, name: &str) -> Result<i64, StorageError> {
572        let row = self
573            .client
574            .query_one(
575                "SELECT COALESCE(MAX(version), 0) FROM server_secret WHERE name = $1",
576                &[&name],
577            )
578            .await
579            .map_err(|_| StorageError::Postgres)?;
580        Ok(row.get(0))
581    }
582
583    /// Deletes expired secret versions.
584    pub async fn delete_expired_server_secrets(
585        &self,
586        name: &str,
587        threshold: DateTime<Utc>,
588    ) -> Result<u64, StorageError> {
589        let affected = self
590            .client
591            .execute(
592                "DELETE FROM server_secret WHERE name = $1 AND expires_at <= $2",
593                &[&name, &threshold],
594            )
595            .await
596            .map_err(|_| StorageError::Postgres)?;
597        Ok(affected)
598    }
599
600    /// Creates a short-lived pairing code bound to an issuer device.
601    pub async fn create_pairing_token(
602        &self,
603        user_id: &str,
604        issuer_device_id: &str,
605        ttl_seconds: i64,
606    ) -> Result<PairingTokenIssued, StorageError> {
607        let issuer = self.load_device(issuer_device_id).await?;
608        if issuer.user_id != user_id || issuer.status != "active" {
609            return Err(StorageError::Invalid);
610        }
611        let ttl = ttl_seconds.clamp(60, 3600);
612        let issued_at = Utc::now();
613        let expires_at = issued_at + Duration::seconds(ttl);
614        for _ in 0..16 {
615            let pair_code = generate_pair_code();
616            let inserted = self
617                .client
618                .execute(
619                    "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at)
620                    VALUES ($1, $2, $3, $4, $5)
621                    ON CONFLICT (pair_code) DO NOTHING",
622                    &[&pair_code, &user_id, &issuer_device_id, &issued_at, &expires_at],
623                )
624                .await
625                .map_err(|_| StorageError::Postgres)?;
626            if inserted == 1 {
627                return Ok(PairingTokenIssued {
628                    pair_code,
629                    issued_at,
630                    expires_at,
631                });
632            }
633        }
634        Err(StorageError::Postgres)
635    }
636
637    /// Registers or rotates a device key.
638    pub async fn upsert_device(&self, record: &DeviceRecord) -> Result<(), StorageError> {
639        let query = "INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at) VALUES ($1, $2, $3, $4, $5)
640            ON CONFLICT (opaque_id) DO UPDATE SET pubkey = excluded.pubkey, status = excluded.status
641            WHERE user_device.user_id = excluded.user_id";
642        self.client
643            .execute(
644                query,
645                &[
646                    &record.device_id,
647                    &record.user_id,
648                    &record.public_key,
649                    &record.status,
650                    &record.created_at,
651                ],
652            )
653            .await
654            .map_err(|_| StorageError::Postgres)?;
655        Ok(())
656    }
657
658    /// Persists an audit trail entry for device key material.
659    pub async fn record_device_key_event(
660        &self,
661        event: &DeviceKeyEvent,
662    ) -> Result<(), StorageError> {
663        let query = "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)";
664        self.client
665            .execute(
666                query,
667                &[
668                    &event.event_id,
669                    &event.device_id,
670                    &event.public_key,
671                    &event.recorded_at,
672                ],
673            )
674            .await
675            .map_err(|_| StorageError::Postgres)?;
676        Ok(())
677    }
678
679    /// Applies a device key rotation atomically.
680    pub async fn apply_device_key_rotation(
681        &self,
682        rotation: &DeviceRotationRecord<'_>,
683    ) -> Result<(), StorageError> {
684        self.client
685            .batch_execute("BEGIN")
686            .await
687            .map_err(|_| StorageError::Postgres)?;
688        let update_result = self
689            .client
690            .execute(
691                "UPDATE user_device SET pubkey = $1 WHERE opaque_id = $2 AND user_id = $3",
692                &[
693                    &rotation.new_public_key,
694                    &rotation.device_id,
695                    &rotation.user_id,
696                ],
697            )
698            .await;
699        let updated = match update_result {
700            Ok(value) => value,
701            Err(_) => {
702                let _ = self.client.batch_execute("ROLLBACK").await;
703                return Err(StorageError::Postgres);
704            }
705        };
706        if updated != 1 {
707            let _ = self.client.batch_execute("ROLLBACK").await;
708            return Err(StorageError::Missing);
709        }
710        if self
711            .client
712            .execute(
713                "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)",
714                &[&rotation.event_id, &rotation.device_id, &rotation.new_public_key, &rotation.applied_at],
715            )
716            .await
717            .is_err()
718        {
719            let _ = self.client.batch_execute("ROLLBACK").await;
720            return Err(StorageError::Postgres);
721        }
722        if self
723            .client
724            .execute(
725                "INSERT INTO device_rotation_audit (rotation_id, device_id, old_public_key, new_public_key, signature, nonce, proof_expires_at, applied_at)
726                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
727                &[
728                    &rotation.rotation_id,
729                    &rotation.device_id,
730                    &rotation.old_public_key,
731                    &rotation.new_public_key,
732                    &rotation.signature,
733                    &rotation.nonce,
734                    &rotation.proof_expires_at,
735                    &rotation.applied_at,
736                ],
737            )
738            .await
739            .is_err()
740        {
741            let _ = self.client.batch_execute("ROLLBACK").await;
742            return Err(StorageError::Postgres);
743        }
744        if self.client.batch_execute("COMMIT").await.is_err() {
745            let _ = self.client.batch_execute("ROLLBACK").await;
746            return Err(StorageError::Postgres);
747        }
748        Ok(())
749    }
750
751    /// Claims a pairing token and registers a new device for the associated user.
752    pub async fn claim_pairing_token(
753        &self,
754        pair_code: &str,
755        device_id: &str,
756        public_key: &[u8],
757    ) -> Result<PairingClaimResult, StorageError> {
758        let recorded_at = Utc::now();
759        let event_id = format!(
760            "pair:{}:{}",
761            device_id,
762            recorded_at.timestamp_nanos_opt().unwrap_or_default()
763        );
764        let stmt = "WITH selected AS (
765                SELECT user_id, issuer_device_id, expires_at, redeemed_at, attempts
766                FROM device_pairing
767                WHERE pair_code = $1
768                FOR UPDATE
769            ),
770            validated AS (
771                SELECT user_id, issuer_device_id
772                FROM selected
773                WHERE expires_at > now()
774                  AND redeemed_at IS NULL
775                  AND attempts < $6
776            ),
777            updated AS (
778                UPDATE device_pairing
779                SET redeemed_at = $5,
780                    redeemed_device_id = $2,
781                    public_key = $3,
782                    attempts = LEAST(attempts + 1, $6)
783                WHERE pair_code = $1
784                  AND EXISTS (SELECT 1 FROM validated)
785                RETURNING user_id, issuer_device_id
786            ),
787            inserted AS (
788                INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at)
789                SELECT $2, user_id, $3, 'active', $5 FROM validated
790                RETURNING user_id
791            ),
792            events AS (
793                INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at)
794                SELECT $4, $2, $3, $5 FROM inserted
795            )
796            SELECT user_id, issuer_device_id FROM updated";
797        let result = self
798            .client
799            .query_opt(
800                stmt,
801                &[
802                    &pair_code,
803                    &device_id,
804                    &public_key,
805                    &event_id,
806                    &recorded_at,
807                    &PAIRING_MAX_ATTEMPTS,
808                ],
809            )
810            .await
811            .map_err(|_| StorageError::Postgres)?;
812        let (user_id, issuer_device_id) = match result {
813            Some(row) => {
814                let user_id: String = row.get(0);
815                let issuer_device_id: String = row.get(1);
816                (user_id, issuer_device_id)
817            }
818            None => {
819                let exists = self
820                    .client
821                    .query_opt(
822                        "SELECT 1 FROM device_pairing WHERE pair_code = $1",
823                        &[&pair_code],
824                    )
825                    .await
826                    .map_err(|_| StorageError::Postgres)?;
827                if exists.is_some() {
828                    self
829                        .client
830                        .execute(
831                            "UPDATE device_pairing SET attempts = LEAST(attempts + 1, $2) WHERE pair_code = $1",
832                            &[&pair_code, &PAIRING_MAX_ATTEMPTS],
833                        )
834                        .await
835                        .map_err(|_| StorageError::Postgres)?;
836                    return Err(StorageError::Invalid);
837                }
838                return Err(StorageError::Missing);
839            }
840        };
841        let profile = self.load_user(&user_id).await?;
842        Ok(PairingClaimResult {
843            user: profile,
844            issuer_device_id,
845        })
846    }
847
848    /// Removes expired or exhausted pairing tokens.
849    pub async fn invalidate_expired_pairings(&self) -> Result<u64, StorageError> {
850        let affected = self
851            .client
852            .execute(
853                "DELETE FROM device_pairing WHERE expires_at <= now() OR attempts >= $1 OR (redeemed_at IS NOT NULL AND redeemed_at <= now() - interval '1 day')",
854                &[&PAIRING_MAX_ATTEMPTS],
855            )
856            .await
857            .map_err(|_| StorageError::Postgres)?;
858        Ok(affected)
859    }
860
861    /// Fetches the newest device key event for a device identifier.
862    pub async fn latest_device_key_event(
863        &self,
864        device_id: &str,
865    ) -> Result<Option<DeviceKeyEvent>, StorageError> {
866        let query = "SELECT event_id, device_id, public_key, recorded_at
867            FROM device_key_event WHERE device_id = $1 ORDER BY recorded_at DESC LIMIT 1";
868        let row = self
869            .client
870            .query_opt(query, &[&device_id])
871            .await
872            .map_err(|_| StorageError::Postgres)?;
873        Ok(row.map(|row| DeviceKeyEvent {
874            event_id: row.get(0),
875            device_id: row.get(1),
876            public_key: row.get(2),
877            recorded_at: row.get(3),
878        }))
879    }
880
881    /// Creates a session binding a device to a TLS fingerprint.
882    pub async fn record_session(&self, session: &SessionRecord) -> Result<(), StorageError> {
883        let query =
884            "INSERT INTO session (opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds)
885            VALUES ($1, $2, $3, $4, $5, $6)";
886        self.client
887            .execute(
888                query,
889                &[
890                    &session.session_id,
891                    &session.user_id,
892                    &session.device_id,
893                    &session.tls_fingerprint,
894                    &session.created_at,
895                    &session.ttl_seconds,
896                ],
897            )
898            .await
899            .map_err(|_| StorageError::Postgres)?;
900        Ok(())
901    }
902
903    /// Loads a persisted session by identifier.
904    pub async fn load_session(&self, session_id: &str) -> Result<SessionRecord, StorageError> {
905        let row = self
906            .client
907            .query_opt(
908                "SELECT opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds FROM session WHERE opaque_id = $1",
909                &[&session_id],
910            )
911            .await
912            .map_err(|_| StorageError::Postgres)?;
913        let row = row.ok_or(StorageError::Missing)?;
914        Ok(SessionRecord {
915            session_id: row.get(0),
916            user_id: row.get(1),
917            device_id: row.get(2),
918            tls_fingerprint: row.get(3),
919            created_at: row.get(4),
920            ttl_seconds: row.get(5),
921        })
922    }
923
924    /// Fetches device metadata by identifier.
925    pub async fn load_device(&self, device_id: &str) -> Result<DeviceRecord, StorageError> {
926        let row = self
927            .client
928            .query_opt(
929                "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE opaque_id = $1",
930                &[&device_id],
931            )
932            .await
933            .map_err(|_| StorageError::Postgres)?;
934        let row = row.ok_or(StorageError::Missing)?;
935        Ok(DeviceRecord {
936            device_id: row.get(0),
937            user_id: row.get(1),
938            public_key: row.get(2),
939            status: row.get(3),
940            created_at: row.get(4),
941        })
942    }
943
944    /// Loads stored post-quantum key material for a device when available.
945    pub async fn load_device_pq_keys(
946        &self,
947        device_id: &str,
948    ) -> Result<Option<DevicePqKeys>, StorageError> {
949        let row = self
950            .client
951            .query_opt(
952                "SELECT device_id, kem_public, signature_public, updated_at FROM device_pq_keys WHERE device_id = $1",
953                &[&device_id],
954            )
955            .await
956            .map_err(|_| StorageError::Postgres)?;
957        Ok(row.map(|row| DevicePqKeys {
958            device_id: row.get(0),
959            kem_public: row.get(1),
960            signature_public: row.get(2),
961            updated_at: row.get(3),
962        }))
963    }
964
965    /// Upserts post-quantum key material for a device.
966    pub async fn upsert_device_pq_keys(&self, keys: &DevicePqKeys) -> Result<(), StorageError> {
967        self.client
968            .execute(
969                "INSERT INTO device_pq_keys (device_id, kem_public, signature_public, updated_at)
970                 VALUES ($1, $2, $3, $4)
971                 ON CONFLICT (device_id) DO UPDATE
972                 SET kem_public = EXCLUDED.kem_public,
973                     signature_public = EXCLUDED.signature_public,
974                     updated_at = EXCLUDED.updated_at",
975                &[
976                    &keys.device_id,
977                    &keys.kem_public,
978                    &keys.signature_public,
979                    &keys.updated_at,
980                ],
981            )
982            .await
983            .map_err(|_| StorageError::Postgres)?;
984        Ok(())
985    }
986
987    /// Counts active devices registered for a user.
988    pub async fn count_active_devices(&self, user_id: &str) -> Result<i64, StorageError> {
989        let row = self
990            .client
991            .query_one(
992                "SELECT COUNT(*) FROM user_device WHERE user_id = $1 AND status = 'active'",
993                &[&user_id],
994            )
995            .await
996            .map_err(|_| StorageError::Postgres)?;
997        Ok(row.get(0))
998    }
999
1000    /// Lists devices associated with a user ordered by creation time.
1001    pub async fn list_devices_for_user(
1002        &self,
1003        user_id: &str,
1004    ) -> Result<Vec<DeviceRecord>, StorageError> {
1005        let rows = self
1006            .client
1007            .query(
1008                "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE user_id = $1 ORDER BY created_at ASC",
1009                &[&user_id],
1010            )
1011            .await
1012            .map_err(|_| StorageError::Postgres)?;
1013        Ok(rows
1014            .into_iter()
1015            .map(|row| DeviceRecord {
1016                device_id: row.get(0),
1017                user_id: row.get(1),
1018                public_key: row.get(2),
1019                status: row.get(3),
1020                created_at: row.get(4),
1021            })
1022            .collect())
1023    }
1024
1025    /// Marks a device as active.
1026    pub async fn activate_device(&self, device_id: &str) -> Result<(), StorageError> {
1027        self.update_device_status(device_id, "active").await
1028    }
1029
1030    /// Marks a device as revoked.
1031    pub async fn deactivate_device(&self, device_id: &str) -> Result<(), StorageError> {
1032        self.update_device_status(device_id, "revoked").await
1033    }
1034
1035    async fn update_device_status(
1036        &self,
1037        device_id: &str,
1038        status: &str,
1039    ) -> Result<(), StorageError> {
1040        let affected = self
1041            .client
1042            .execute(
1043                "UPDATE user_device SET status = $2 WHERE opaque_id = $1",
1044                &[&device_id, &status],
1045            )
1046            .await
1047            .map_err(|_| StorageError::Postgres)?;
1048        if affected == 0 {
1049            return Err(StorageError::Missing);
1050        }
1051        Ok(())
1052    }
1053
1054    /// Creates a new user profile entry.
1055    pub async fn create_user(&self, profile: &NewUserProfile) -> Result<UserProfile, StorageError> {
1056        let now = Utc::now();
1057        let row = self
1058            .client
1059            .query_one(
1060                "INSERT INTO app_user (user_id, handle, domain, display_name, avatar_url, created_at, updated_at)
1061                VALUES ($1, $2, $3, $4, $5, $6, $6)
1062                RETURNING user_id, handle, domain, display_name, avatar_url, created_at, updated_at",
1063                &[
1064                    &profile.user_id,
1065                    &profile.handle,
1066                    &profile.domain,
1067                    &profile.display_name,
1068                    &profile.avatar_url,
1069                    &now,
1070                ],
1071            )
1072            .await
1073            .map_err(|_| StorageError::Postgres)?;
1074        Ok(UserProfile {
1075            user_id: row.get(0),
1076            handle: row.get(1),
1077            domain: row.get(2),
1078            display_name: row.get(3),
1079            avatar_url: row.get(4),
1080            created_at: row.get(5),
1081            updated_at: row.get(6),
1082        })
1083    }
1084
1085    /// Loads a user profile by identifier.
1086    pub async fn load_user(&self, user_id: &str) -> Result<UserProfile, StorageError> {
1087        let row = self
1088            .client
1089            .query_opt(
1090                "SELECT user_id, handle, domain, display_name, avatar_url, created_at, updated_at FROM app_user WHERE user_id = $1",
1091                &[&user_id],
1092            )
1093            .await
1094            .map_err(|_| StorageError::Postgres)?;
1095        let row = row.ok_or(StorageError::Missing)?;
1096        Ok(UserProfile {
1097            user_id: row.get(0),
1098            handle: row.get(1),
1099            domain: row.get(2),
1100            display_name: row.get(3),
1101            avatar_url: row.get(4),
1102            created_at: row.get(5),
1103            updated_at: row.get(6),
1104        })
1105    }
1106
1107    /// Loads a user profile by handle.
1108    pub async fn load_user_by_handle(&self, handle: &str) -> Result<UserProfile, StorageError> {
1109        let row = self
1110            .client
1111            .query_opt(
1112                "SELECT user_id, handle, domain, display_name, avatar_url, created_at, updated_at FROM app_user WHERE handle = $1",
1113                &[&handle],
1114            )
1115            .await
1116            .map_err(|_| StorageError::Postgres)?;
1117        let row = row.ok_or(StorageError::Missing)?;
1118        Ok(UserProfile {
1119            user_id: row.get(0),
1120            handle: row.get(1),
1121            domain: row.get(2),
1122            display_name: row.get(3),
1123            avatar_url: row.get(4),
1124            created_at: row.get(5),
1125            updated_at: row.get(6),
1126        })
1127    }
1128
1129    /// Applies partial updates to user profile metadata.
1130    pub async fn update_user_profile(
1131        &self,
1132        user_id: &str,
1133        display_name: Option<&str>,
1134        avatar_url: Option<&str>,
1135    ) -> Result<(), StorageError> {
1136        let now = Utc::now();
1137        let affected = self
1138            .client
1139            .execute(
1140                "UPDATE app_user SET display_name = COALESCE($2, display_name), avatar_url = COALESCE($3, avatar_url), updated_at = $4 WHERE user_id = $1",
1141                &[&user_id, &display_name, &avatar_url, &now],
1142            )
1143            .await
1144            .map_err(|_| StorageError::Postgres)?;
1145        if affected == 0 {
1146            return Err(StorageError::Missing);
1147        }
1148        Ok(())
1149    }
1150
1151    /// Updates user avatar URL
1152    pub async fn update_user_avatar(
1153        &self,
1154        user_id: &str,
1155        avatar_url: &str,
1156    ) -> Result<(), StorageError> {
1157        let now = Utc::now();
1158        let affected = self
1159            .client
1160            .execute(
1161                "UPDATE app_user SET avatar_url = $2, updated_at = $3 WHERE user_id = $1",
1162                &[&user_id, &avatar_url, &now],
1163            )
1164            .await
1165            .map_err(|_| StorageError::Postgres)?;
1166        if affected == 0 {
1167            return Err(StorageError::Missing);
1168        }
1169        Ok(())
1170    }
1171
1172    /// Loads user by handle@domain
1173    pub async fn load_user_by_federated_id(
1174        &self,
1175        handle: &str,
1176        domain: &str,
1177    ) -> Result<UserProfile, StorageError> {
1178        let row = self
1179            .client
1180            .query_opt(
1181                "SELECT user_id, handle, domain, display_name, avatar_url, created_at, updated_at 
1182                FROM app_user WHERE handle = $1 AND domain = $2",
1183                &[&handle, &domain],
1184            )
1185            .await
1186            .map_err(|_| StorageError::Postgres)?;
1187        let row = row.ok_or(StorageError::Missing)?;
1188        Ok(UserProfile {
1189            user_id: row.get(0),
1190            handle: row.get(1),
1191            domain: row.get(2),
1192            display_name: row.get(3),
1193            avatar_url: row.get(4),
1194            created_at: row.get(5),
1195            updated_at: row.get(6),
1196        })
1197    }
1198
1199    /// Caches remote user profile
1200    pub async fn cache_remote_user(&self, profile: &RemoteUserProfile) -> Result<(), StorageError> {
1201        self.client
1202            .execute(
1203                "INSERT INTO remote_user_cache 
1204                (user_id, domain, handle, display_name, avatar_url, profile_data, cached_at, expires_at, last_fetched_at)
1205                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1206                ON CONFLICT (user_id) DO UPDATE SET
1207                    display_name = EXCLUDED.display_name,
1208                    avatar_url = EXCLUDED.avatar_url,
1209                    profile_data = EXCLUDED.profile_data,
1210                    last_fetched_at = EXCLUDED.last_fetched_at,
1211                    expires_at = EXCLUDED.expires_at",
1212                &[
1213                    &profile.user_id,
1214                    &profile.domain,
1215                    &profile.handle,
1216                    &profile.display_name,
1217                    &profile.avatar_url,
1218                    &profile.profile_data,
1219                    &profile.cached_at,
1220                    &profile.expires_at,
1221                    &profile.last_fetched_at,
1222                ],
1223            )
1224            .await
1225            .map_err(|_| StorageError::Postgres)?;
1226        Ok(())
1227    }
1228
1229    /// Loads cached remote user profile
1230    pub async fn load_remote_user_cache(
1231        &self,
1232        user_id: &str,
1233    ) -> Result<Option<RemoteUserProfile>, StorageError> {
1234        let row = self
1235            .client
1236            .query_opt(
1237                "SELECT user_id, domain, handle, display_name, avatar_url, profile_data, 
1238                cached_at, expires_at, last_fetched_at
1239                FROM remote_user_cache WHERE user_id = $1 AND expires_at > now()",
1240                &[&user_id],
1241            )
1242            .await
1243            .map_err(|_| StorageError::Postgres)?;
1244
1245        Ok(row.map(|row| RemoteUserProfile {
1246            user_id: row.get(0),
1247            domain: row.get(1),
1248            handle: row.get(2),
1249            display_name: row.get(3),
1250            avatar_url: row.get(4),
1251            profile_data: row.get(5),
1252            cached_at: row.get(6),
1253            expires_at: row.get(7),
1254            last_fetched_at: row.get(8),
1255        }))
1256    }
1257
1258    /// Updates federation peer status
1259    pub async fn update_federation_peer_status(
1260        &self,
1261        status: &FederationPeerConnectionStatus,
1262    ) -> Result<(), StorageError> {
1263        self.client
1264            .execute(
1265                "INSERT INTO federation_peer_status 
1266                (domain, endpoint, public_key, last_seen_at, last_error, error_count, status, created_at, updated_at)
1267                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1268                ON CONFLICT (domain) DO UPDATE SET
1269                    endpoint = EXCLUDED.endpoint,
1270                    public_key = EXCLUDED.public_key,
1271                    last_seen_at = EXCLUDED.last_seen_at,
1272                    last_error = EXCLUDED.last_error,
1273                    error_count = EXCLUDED.error_count,
1274                    status = EXCLUDED.status,
1275                    updated_at = EXCLUDED.updated_at",
1276                &[
1277                    &status.domain,
1278                    &status.endpoint,
1279                    &status.public_key,
1280                    &status.last_seen_at,
1281                    &status.last_error,
1282                    &status.error_count,
1283                    &status.status,
1284                    &status.created_at,
1285                    &status.updated_at,
1286                ],
1287            )
1288            .await
1289            .map_err(|_| StorageError::Postgres)?;
1290        Ok(())
1291    }
1292
1293    /// Creates federated friend request
1294    pub async fn create_federated_friend_request(
1295        &self,
1296        request: &FederatedFriendRequest,
1297    ) -> Result<(), StorageError> {
1298        self.client
1299            .execute(
1300                "INSERT INTO federated_friend_requests 
1301                (request_id, from_user_id, to_user_id, from_domain, to_domain, message, status, federation_event_id, created_at, updated_at)
1302                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1303                &[
1304                    &request.request_id,
1305                    &request.from_user_id,
1306                    &request.to_user_id,
1307                    &request.from_domain,
1308                    &request.to_domain,
1309                    &request.message,
1310                    &request.status,
1311                    &request.federation_event_id,
1312                    &request.created_at,
1313                    &request.updated_at,
1314                ],
1315            )
1316            .await
1317            .map_err(|_| StorageError::Postgres)?;
1318        Ok(())
1319    }
1320
1321    /// Updates federated friend request status
1322    pub async fn update_federated_friend_request_status(
1323        &self,
1324        request_id: &str,
1325        status: &str,
1326    ) -> Result<(), StorageError> {
1327        let now = Utc::now();
1328        let affected = self
1329            .client
1330            .execute(
1331                "UPDATE federated_friend_requests SET status = $2, updated_at = $3 WHERE request_id = $1",
1332                &[&request_id, &status, &now],
1333            )
1334            .await
1335            .map_err(|_| StorageError::Postgres)?;
1336        if affected == 0 {
1337            return Err(StorageError::Missing);
1338        }
1339        Ok(())
1340    }
1341
1342    /// Lists federated friend requests for user
1343    pub async fn list_federated_friend_requests(
1344        &self,
1345        user_id: &str,
1346        incoming: bool,
1347    ) -> Result<Vec<FederatedFriendRequest>, StorageError> {
1348        let query = if incoming {
1349            "SELECT request_id, from_user_id, to_user_id, from_domain, to_domain, message, status, federation_event_id, created_at, updated_at
1350            FROM federated_friend_requests WHERE to_user_id = $1 ORDER BY created_at DESC"
1351        } else {
1352            "SELECT request_id, from_user_id, to_user_id, from_domain, to_domain, message, status, federation_event_id, created_at, updated_at
1353            FROM federated_friend_requests WHERE from_user_id = $1 ORDER BY created_at DESC"
1354        };
1355
1356        let rows = self
1357            .client
1358            .query(query, &[&user_id])
1359            .await
1360            .map_err(|_| StorageError::Postgres)?;
1361
1362        Ok(rows
1363            .into_iter()
1364            .map(|row| FederatedFriendRequest {
1365                request_id: row.get(0),
1366                from_user_id: row.get(1),
1367                to_user_id: row.get(2),
1368                from_domain: row.get(3),
1369                to_domain: row.get(4),
1370                message: row.get(5),
1371                status: row.get(6),
1372                federation_event_id: row.get(7),
1373                created_at: row.get(8),
1374                updated_at: row.get(9),
1375            })
1376            .collect())
1377    }
1378
1379    /// Creates a chat group entry and enrolls the owner as a member.
1380    pub async fn create_group(&self, group: &ChatGroup) -> Result<(), StorageError> {
1381        self.client
1382            .execute(
1383                "INSERT INTO chat_group (group_id, owner_device, created_at) VALUES ($1, $2, $3)
1384                ON CONFLICT (group_id) DO NOTHING",
1385                &[&group.group_id, &group.owner_device, &group.created_at],
1386            )
1387            .await
1388            .map_err(|_| StorageError::Postgres)?;
1389        self.client
1390            .execute(
1391                "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
1392                ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role",
1393                &[
1394                    &group.group_id,
1395                    &group.owner_device,
1396                    &GroupRole::Owner.as_str(),
1397                    &group.created_at,
1398                ],
1399            )
1400            .await
1401            .map_err(|_| StorageError::Postgres)?;
1402        Ok(())
1403    }
1404
1405    /// Adds or updates group membership information.
1406    pub async fn add_group_member(&self, member: &GroupMember) -> Result<(), StorageError> {
1407        let query = "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
1408            ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role, joined_at = excluded.joined_at";
1409        self.client
1410            .execute(
1411                query,
1412                &[
1413                    &member.group_id,
1414                    &member.device_id,
1415                    &member.role.as_str(),
1416                    &member.joined_at,
1417                ],
1418            )
1419            .await
1420            .map_err(|_| StorageError::Postgres)?;
1421        Ok(())
1422    }
1423
1424    /// Removes a member from the given group.
1425    pub async fn remove_group_member(
1426        &self,
1427        group_id: &str,
1428        device_id: &str,
1429    ) -> Result<(), StorageError> {
1430        let affected = self
1431            .client
1432            .execute(
1433                "DELETE FROM group_member WHERE group_id = $1 AND device_id = $2",
1434                &[&group_id, &device_id],
1435            )
1436            .await
1437            .map_err(|_| StorageError::Postgres)?;
1438        if affected == 0 {
1439            return Err(StorageError::Missing);
1440        }
1441        Ok(())
1442    }
1443
1444    /// Lists all members of a group ordered by join time.
1445    pub async fn list_group_members(
1446        &self,
1447        group_id: &str,
1448    ) -> Result<Vec<GroupMember>, StorageError> {
1449        let rows = self
1450            .client
1451            .query(
1452                "SELECT group_id, device_id, role, joined_at FROM group_member WHERE group_id = $1 ORDER BY joined_at ASC",
1453                &[&group_id],
1454            )
1455            .await
1456            .map_err(|_| StorageError::Postgres)?;
1457        let mut members = Vec::with_capacity(rows.len());
1458        for row in rows {
1459            let role: String = row.get(2);
1460            let parsed = GroupRole::from_str(role.as_str())?;
1461            members.push(GroupMember {
1462                group_id: row.get(0),
1463                device_id: row.get(1),
1464                role: parsed,
1465                joined_at: row.get(3),
1466            });
1467        }
1468        Ok(members)
1469    }
1470
1471    /// Loads group metadata by identifier.
1472    pub async fn load_group(&self, group_id: &str) -> Result<ChatGroup, StorageError> {
1473        let row = self
1474            .client
1475            .query_opt(
1476                "SELECT group_id, owner_device, created_at FROM chat_group WHERE group_id = $1",
1477                &[&group_id],
1478            )
1479            .await
1480            .map_err(|_| StorageError::Postgres)?;
1481        let row = row.ok_or(StorageError::Missing)?;
1482        Ok(ChatGroup {
1483            group_id: row.get(0),
1484            owner_device: row.get(1),
1485            created_at: row.get(2),
1486        })
1487    }
1488
1489    /// Lists groups that include the target device.
1490    pub async fn list_groups_for_device(
1491        &self,
1492        device_id: &str,
1493    ) -> Result<Vec<ChatGroup>, StorageError> {
1494        let rows = self
1495            .client
1496            .query(
1497                "SELECT g.group_id, g.owner_device, g.created_at FROM chat_group g
1498                INNER JOIN group_member m ON g.group_id = m.group_id
1499                WHERE m.device_id = $1 ORDER BY g.created_at ASC",
1500                &[&device_id],
1501            )
1502            .await
1503            .map_err(|_| StorageError::Postgres)?;
1504        Ok(rows
1505            .into_iter()
1506            .map(|row| ChatGroup {
1507                group_id: row.get(0),
1508                owner_device: row.get(1),
1509                created_at: row.get(2),
1510            })
1511            .collect())
1512    }
1513
1514    /// Upserts federation peer descriptors for S2S routing.
1515    pub async fn upsert_federation_peer(
1516        &self,
1517        peer: &FederationPeerRecord,
1518    ) -> Result<(), StorageError> {
1519        let query = "INSERT INTO federation_peer (domain, endpoint, public_key, status, updated_at)
1520            VALUES ($1, $2, $3, $4, $5)
1521            ON CONFLICT (domain) DO UPDATE SET endpoint = excluded.endpoint, public_key = excluded.public_key, status = excluded.status, updated_at = excluded.updated_at";
1522        self.client
1523            .execute(
1524                query,
1525                &[
1526                    &peer.domain,
1527                    &peer.endpoint,
1528                    &peer.public_key.as_slice(),
1529                    &peer.status.as_str(),
1530                    &peer.updated_at,
1531                ],
1532            )
1533            .await
1534            .map_err(|_| StorageError::Postgres)?;
1535        Ok(())
1536    }
1537
1538    /// Loads a federation peer by domain.
1539    pub async fn load_federation_peer(
1540        &self,
1541        domain: &str,
1542    ) -> Result<FederationPeerRecord, StorageError> {
1543        let row = self
1544            .client
1545            .query_opt(
1546                "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer WHERE domain = $1",
1547                &[&domain],
1548            )
1549            .await
1550            .map_err(|_| StorageError::Postgres)?;
1551        let row = row.ok_or(StorageError::Missing)?;
1552        let key: Vec<u8> = row.get(2);
1553        let status: String = row.get(3);
1554        let status = FederationPeerStatus::from_str(status.as_str())?;
1555        let public_key: [u8; 32] = key
1556            .as_slice()
1557            .try_into()
1558            .map_err(|_| StorageError::Serialization)?;
1559        Ok(FederationPeerRecord {
1560            domain: row.get(0),
1561            endpoint: row.get(1),
1562            public_key,
1563            status,
1564            updated_at: row.get(4),
1565        })
1566    }
1567
1568    /// Enumerates all known federation peers.
1569    pub async fn list_federation_peers(&self) -> Result<Vec<FederationPeerRecord>, StorageError> {
1570        let rows = self
1571            .client
1572            .query(
1573                "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer",
1574                &[],
1575            )
1576            .await
1577            .map_err(|_| StorageError::Postgres)?;
1578        let mut peers = Vec::with_capacity(rows.len());
1579        for row in rows {
1580            let key: Vec<u8> = row.get(2);
1581            let status: String = row.get(3);
1582            let status = FederationPeerStatus::from_str(status.as_str())?;
1583            let public_key: [u8; 32] = key
1584                .as_slice()
1585                .try_into()
1586                .map_err(|_| StorageError::Serialization)?;
1587            peers.push(FederationPeerRecord {
1588                domain: row.get(0),
1589                endpoint: row.get(1),
1590                public_key,
1591                status,
1592                updated_at: row.get(4),
1593            });
1594        }
1595        Ok(peers)
1596    }
1597
1598    /// Sets the peer status and refresh timestamp.
1599    pub async fn set_federation_peer_status(
1600        &self,
1601        domain: &str,
1602        status: FederationPeerStatus,
1603    ) -> Result<(), StorageError> {
1604        let now = Utc::now();
1605        let affected = self
1606            .client
1607            .execute(
1608                "UPDATE federation_peer SET status = $2, updated_at = $3 WHERE domain = $1",
1609                &[&domain, &status.as_str(), &now],
1610            )
1611            .await
1612            .map_err(|_| StorageError::Postgres)?;
1613        if affected == 0 {
1614            return Err(StorageError::Missing);
1615        }
1616        Ok(())
1617    }
1618
1619    /// Schedules an encrypted relay envelope for delivery.
1620    pub async fn enqueue_relay(&self, envelope: &RelayEnvelope) -> Result<(), StorageError> {
1621        let query =
1622            "INSERT INTO relay_queue (envelope_id, channel_id, payload, deliver_after, expires_at)
1623            VALUES ($1, $2, $3, $4, $5)";
1624        self.client
1625            .execute(
1626                query,
1627                &[
1628                    &envelope.envelope_id,
1629                    &envelope.channel_id,
1630                    &envelope.payload,
1631                    &envelope.deliver_after,
1632                    &envelope.expires_at,
1633                ],
1634            )
1635            .await
1636            .map_err(|_| StorageError::Postgres)?;
1637        Ok(())
1638    }
1639
1640    /// Claims pending relay envelopes for a channel.
1641    pub async fn claim_envelopes(
1642        &self,
1643        channel_id: &str,
1644        limit: i64,
1645    ) -> Result<Vec<RelayEnvelope>, StorageError> {
1646        let query = "DELETE FROM relay_queue
1647            WHERE envelope_id IN (
1648                SELECT envelope_id FROM relay_queue
1649                WHERE channel_id = $1 AND deliver_after <= now()
1650                ORDER BY deliver_after ASC
1651                LIMIT $2
1652            )
1653            RETURNING envelope_id, channel_id, payload, deliver_after, expires_at";
1654        let rows = self
1655            .client
1656            .query(query, &[&channel_id, &limit])
1657            .await
1658            .map_err(|_| StorageError::Postgres)?;
1659        Ok(rows
1660            .into_iter()
1661            .map(|row| RelayEnvelope {
1662                envelope_id: row.get(0),
1663                channel_id: row.get(1),
1664                payload: row.get(2),
1665                deliver_after: row.get(3),
1666                expires_at: row.get(4),
1667            })
1668            .collect())
1669    }
1670
1671    /// Registers a federation event for outbound delivery.
1672    pub async fn enqueue_federation_outbox(
1673        &self,
1674        record: &FederationOutboxInsert<'_>,
1675    ) -> Result<(), StorageError> {
1676        self.client
1677            .execute(
1678                "INSERT INTO federation_outbox (outbox_id, destination_domain, endpoint, payload, public_key, next_attempt_at)
1679                VALUES ($1, $2, $3, $4, $5, $6)
1680                ON CONFLICT (outbox_id) DO NOTHING",
1681                &[
1682                    &record.outbox_id,
1683                    &record.destination,
1684                    &record.endpoint,
1685                    &record.payload,
1686                    &&record.public_key[..],
1687                    &record.next_attempt_at,
1688                ],
1689            )
1690            .await
1691            .map_err(|_| StorageError::Postgres)?;
1692        Ok(())
1693    }
1694
1695    /// Claims due federation events and leases them for processing.
1696    pub async fn claim_federation_outbox(
1697        &self,
1698        limit: i64,
1699        lease: Duration,
1700        now: DateTime<Utc>,
1701    ) -> Result<Vec<FederationOutboxMessage>, StorageError> {
1702        let lease_deadline = now + lease;
1703        let query = "WITH due AS (
1704                SELECT outbox_id
1705                FROM federation_outbox
1706                WHERE next_attempt_at <= $1
1707                ORDER BY created_at ASC
1708                FOR UPDATE SKIP LOCKED
1709                LIMIT $2
1710            ),
1711            updated AS (
1712                UPDATE federation_outbox f
1713                SET next_attempt_at = $3,
1714                    attempts = f.attempts + 1,
1715                    last_error = NULL
1716                FROM due
1717                WHERE f.outbox_id = due.outbox_id
1718                RETURNING f.outbox_id, f.destination_domain, f.endpoint, f.payload, f.public_key, f.attempts, f.next_attempt_at, f.last_error
1719            )
1720            SELECT * FROM updated";
1721        let rows = self
1722            .client
1723            .query(query, &[&now, &limit, &lease_deadline])
1724            .await
1725            .map_err(|_| StorageError::Postgres)?;
1726        rows.into_iter()
1727            .map(|row| {
1728                let key: Vec<u8> = row.get(4);
1729                let public_key: [u8; 32] = key
1730                    .as_slice()
1731                    .try_into()
1732                    .map_err(|_| StorageError::Serialization)?;
1733                Ok(FederationOutboxMessage {
1734                    outbox_id: row.get(0),
1735                    destination: row.get(1),
1736                    endpoint: row.get(2),
1737                    payload: row.get(3),
1738                    public_key,
1739                    attempts: row.get(5),
1740                    next_attempt_at: row.get(6),
1741                    last_error: row.get(7),
1742                })
1743            })
1744            .collect()
1745    }
1746
1747    /// Removes a federation outbox entry once delivery succeeds.
1748    pub async fn delete_federation_outbox(&self, outbox_id: &str) -> Result<(), StorageError> {
1749        let affected = self
1750            .client
1751            .execute(
1752                "DELETE FROM federation_outbox WHERE outbox_id = $1",
1753                &[&outbox_id],
1754            )
1755            .await
1756            .map_err(|_| StorageError::Postgres)?;
1757        if affected == 0 {
1758            return Err(StorageError::Missing);
1759        }
1760        Ok(())
1761    }
1762
1763    /// Reschedules a federation outbox entry after a failed attempt.
1764    pub async fn reschedule_federation_outbox(
1765        &self,
1766        outbox_id: &str,
1767        delay: Duration,
1768        now: DateTime<Utc>,
1769        error: Option<&str>,
1770    ) -> Result<(), StorageError> {
1771        let next_attempt = now + delay;
1772        let affected = self
1773            .client
1774            .execute(
1775                "UPDATE federation_outbox SET next_attempt_at = $2, last_error = $3 WHERE outbox_id = $1",
1776                &[&outbox_id, &next_attempt, &error],
1777            )
1778            .await
1779            .map_err(|_| StorageError::Postgres)?;
1780        if affected == 0 {
1781            return Err(StorageError::Missing);
1782        }
1783        Ok(())
1784    }
1785
1786    /// Stores the last delivered envelope reference for an entity/channel pair.
1787    pub async fn store_inbox_offset(&self, offset: &InboxOffset) -> Result<(), StorageError> {
1788        let query = "INSERT INTO inbox_offset (entity_id, channel_id, last_envelope_id, updated_at)
1789            VALUES ($1, $2, $3, $4)
1790            ON CONFLICT (entity_id, channel_id) DO UPDATE SET last_envelope_id = excluded.last_envelope_id, updated_at = excluded.updated_at";
1791        self.client
1792            .execute(
1793                query,
1794                &[
1795                    &offset.entity_id,
1796                    &offset.channel_id,
1797                    &offset.last_envelope_id,
1798                    &offset.updated_at,
1799                ],
1800            )
1801            .await
1802            .map_err(|_| StorageError::Postgres)?;
1803        Ok(())
1804    }
1805
1806    /// Reads the stored inbox offset if present.
1807    pub async fn read_inbox_offset(
1808        &self,
1809        entity_id: &str,
1810        channel_id: &str,
1811    ) -> Result<Option<InboxOffset>, StorageError> {
1812        let row = self
1813            .client
1814            .query_opt(
1815                "SELECT entity_id, channel_id, last_envelope_id, updated_at FROM inbox_offset WHERE entity_id = $1 AND channel_id = $2",
1816                &[&entity_id, &channel_id],
1817            )
1818            .await
1819            .map_err(|_| StorageError::Postgres)?;
1820        Ok(row.map(|row| InboxOffset {
1821            entity_id: row.get(0),
1822            channel_id: row.get(1),
1823            last_envelope_id: row.get(2),
1824            updated_at: row.get(3),
1825        }))
1826    }
1827
1828    /// Records an idempotency key for deduplication.
1829    pub async fn store_idempotency(&self, key: &IdempotencyKey) -> Result<bool, StorageError> {
1830        let query = "INSERT INTO idempotency (key, scope, created_at) VALUES ($1, $2, $3)
1831            ON CONFLICT (key, scope) DO NOTHING RETURNING 1";
1832        let row = self
1833            .client
1834            .query_opt(query, &[&key.key, &key.scope, &key.created_at])
1835            .await
1836            .map_err(|_| StorageError::Postgres)?;
1837        Ok(row.is_some())
1838    }
1839
1840    /// Creates a friend request
1841    pub async fn create_friend_request(
1842        &self,
1843        id: &str,
1844        from_user_id: &str,
1845        to_user_id: &str,
1846        message: Option<&str>,
1847    ) -> Result<FriendRequest, StorageError> {
1848        let query = "INSERT INTO friend_requests (id, from_user_id, to_user_id, message, created_at, updated_at)
1849            VALUES ($1, $2, $3, $4, NOW(), NOW())
1850            ON CONFLICT (from_user_id, to_user_id) DO UPDATE
1851            SET status = 'pending', message = EXCLUDED.message, updated_at = NOW()
1852            RETURNING id, from_user_id, to_user_id, status, message, created_at, updated_at";
1853        let row = self
1854            .client
1855            .query_one(query, &[&id, &from_user_id, &to_user_id, &message])
1856            .await
1857            .map_err(|_| StorageError::Postgres)?;
1858        Ok(FriendRequest {
1859            id: row.get(0),
1860            from_user_id: row.get(1),
1861            to_user_id: row.get(2),
1862            status: row.get(3),
1863            message: row.get(4),
1864            created_at: row.get(5),
1865            updated_at: row.get(6),
1866        })
1867    }
1868
1869    /// Gets a friend request by ID
1870    pub async fn get_friend_request(&self, id: &str) -> Result<FriendRequest, StorageError> {
1871        let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
1872            FROM friend_requests WHERE id = $1";
1873        let row = self
1874            .client
1875            .query_opt(query, &[&id])
1876            .await
1877            .map_err(|_| StorageError::Postgres)?
1878            .ok_or(StorageError::Missing)?;
1879        Ok(FriendRequest {
1880            id: row.get(0),
1881            from_user_id: row.get(1),
1882            to_user_id: row.get(2),
1883            status: row.get(3),
1884            message: row.get(4),
1885            created_at: row.get(5),
1886            updated_at: row.get(6),
1887        })
1888    }
1889
1890    /// Lists incoming friend requests for a user
1891    pub async fn list_incoming_friend_requests(
1892        &self,
1893        user_id: &str,
1894    ) -> Result<Vec<FriendRequest>, StorageError> {
1895        let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
1896            FROM friend_requests
1897            WHERE to_user_id = $1 AND status = 'pending'
1898            ORDER BY created_at DESC";
1899        let rows = self
1900            .client
1901            .query(query, &[&user_id])
1902            .await
1903            .map_err(|_| StorageError::Postgres)?;
1904        Ok(rows
1905            .iter()
1906            .map(|row| FriendRequest {
1907                id: row.get(0),
1908                from_user_id: row.get(1),
1909                to_user_id: row.get(2),
1910                status: row.get(3),
1911                message: row.get(4),
1912                created_at: row.get(5),
1913                updated_at: row.get(6),
1914            })
1915            .collect())
1916    }
1917
1918    /// Lists outgoing friend requests from a user
1919    pub async fn list_outgoing_friend_requests(
1920        &self,
1921        user_id: &str,
1922    ) -> Result<Vec<FriendRequest>, StorageError> {
1923        let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
1924            FROM friend_requests
1925            WHERE from_user_id = $1
1926            ORDER BY created_at DESC";
1927        let rows = self
1928            .client
1929            .query(query, &[&user_id])
1930            .await
1931            .map_err(|_| StorageError::Postgres)?;
1932        Ok(rows
1933            .iter()
1934            .map(|row| FriendRequest {
1935                id: row.get(0),
1936                from_user_id: row.get(1),
1937                to_user_id: row.get(2),
1938                status: row.get(3),
1939                message: row.get(4),
1940                created_at: row.get(5),
1941                updated_at: row.get(6),
1942            })
1943            .collect())
1944    }
1945
1946    /// Accepts a friend request
1947    pub async fn accept_friend_request(&self, id: &str) -> Result<FriendRequest, StorageError> {
1948        let query = "UPDATE friend_requests
1949            SET status = 'accepted', updated_at = NOW()
1950            WHERE id = $1 AND status = 'pending'
1951            RETURNING id, from_user_id, to_user_id, status, message, created_at, updated_at";
1952        let row = self
1953            .client
1954            .query_opt(query, &[&id])
1955            .await
1956            .map_err(|_| StorageError::Postgres)?
1957            .ok_or(StorageError::Missing)?;
1958        Ok(FriendRequest {
1959            id: row.get(0),
1960            from_user_id: row.get(1),
1961            to_user_id: row.get(2),
1962            status: row.get(3),
1963            message: row.get(4),
1964            created_at: row.get(5),
1965            updated_at: row.get(6),
1966        })
1967    }
1968
1969    /// Rejects a friend request
1970    pub async fn reject_friend_request(&self, id: &str) -> Result<FriendRequest, StorageError> {
1971        let query = "UPDATE friend_requests
1972            SET status = 'rejected', updated_at = NOW()
1973            WHERE id = $1 AND status = 'pending'
1974            RETURNING id, from_user_id, to_user_id, status, message, created_at, updated_at";
1975        let row = self
1976            .client
1977            .query_opt(query, &[&id])
1978            .await
1979            .map_err(|_| StorageError::Postgres)?
1980            .ok_or(StorageError::Missing)?;
1981        Ok(FriendRequest {
1982            id: row.get(0),
1983            from_user_id: row.get(1),
1984            to_user_id: row.get(2),
1985            status: row.get(3),
1986            message: row.get(4),
1987            created_at: row.get(5),
1988            updated_at: row.get(6),
1989        })
1990    }
1991
1992    /// Deletes a friend request
1993    pub async fn delete_friend_request(&self, id: &str) -> Result<(), StorageError> {
1994        let query = "DELETE FROM friend_requests WHERE id = $1";
1995        self.client
1996            .execute(query, &[&id])
1997            .await
1998            .map_err(|_| StorageError::Postgres)?;
1999        Ok(())
2000    }
2001
2002    /// Checks if a friend request exists between two users
2003    pub async fn friend_request_exists(
2004        &self,
2005        from_user_id: &str,
2006        to_user_id: &str,
2007    ) -> Result<Option<FriendRequest>, StorageError> {
2008        let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
2009            FROM friend_requests
2010            WHERE from_user_id = $1 AND to_user_id = $2";
2011        let row = self
2012            .client
2013            .query_opt(query, &[&from_user_id, &to_user_id])
2014            .await
2015            .map_err(|_| StorageError::Postgres)?;
2016        Ok(row.map(|row| FriendRequest {
2017            id: row.get(0),
2018            from_user_id: row.get(1),
2019            to_user_id: row.get(2),
2020            status: row.get(3),
2021            message: row.get(4),
2022            created_at: row.get(5),
2023            updated_at: row.get(6),
2024        }))
2025    }
2026
2027    /// Publishes local presence information into Redis.
2028    pub async fn publish_presence(&self, snapshot: &PresenceSnapshot) -> Result<(), StorageError> {
2029        let mut conn = self.redis.lock().await;
2030        let ttl = (snapshot.expires_at.timestamp() - Utc::now().timestamp()).max(1) as usize;
2031        let payload = serde_json::json!({
2032            "entity": snapshot.entity.clone(),
2033            "state": snapshot.state.clone(),
2034            "expires_at": snapshot.expires_at.to_rfc3339(),
2035            "user": presence_user_payload(snapshot),
2036        })
2037        .to_string();
2038        redis::cmd("SETEX")
2039            .arg(format!("presence:{}", snapshot.entity))
2040            .arg(ttl)
2041            .arg(payload)
2042            .query_async::<()>(&mut *conn)
2043            .await
2044            .map_err(|_| StorageError::Redis)?;
2045        Ok(())
2046    }
2047
2048    /// Reads presence state from Redis.
2049    pub async fn read_presence(
2050        &self,
2051        entity: &str,
2052    ) -> Result<Option<PresenceSnapshot>, StorageError> {
2053        let mut conn = self.redis.lock().await;
2054        let value: Option<String> = redis::cmd("GET")
2055            .arg(format!("presence:{}", entity))
2056            .query_async::<Option<String>>(&mut *conn)
2057            .await
2058            .map_err(|_| StorageError::Redis)?;
2059        if let Some(json) = value {
2060            let parsed: Value =
2061                serde_json::from_str(&json).map_err(|_| StorageError::Serialization)?;
2062            let state = parsed
2063                .get("state")
2064                .and_then(|v| v.as_str())
2065                .unwrap_or("online")
2066                .to_string();
2067            let expires = parsed
2068                .get("expires_at")
2069                .and_then(|v| v.as_str())
2070                .ok_or(StorageError::Serialization)?;
2071            let expires = DateTime::parse_from_rfc3339(expires)
2072                .map_err(|_| StorageError::Serialization)?
2073                .with_timezone(&Utc);
2074            let user_obj = parsed.get("user").and_then(|v| v.as_object());
2075            let (user_id, handle, display_name, avatar_url) = if let Some(map) = user_obj {
2076                presence_user_fields(map)
2077            } else {
2078                (None, None, None, None)
2079            };
2080            Ok(Some(PresenceSnapshot {
2081                entity: entity.to_string(),
2082                state,
2083                expires_at: expires,
2084                user_id,
2085                handle,
2086                display_name,
2087                avatar_url,
2088            }))
2089        } else {
2090            Ok(None)
2091        }
2092    }
2093
2094    /// Registers a routing entry in Redis for direct message delivery.
2095    pub async fn register_route(
2096        &self,
2097        entity: &str,
2098        session_id: &str,
2099        ttl_seconds: i64,
2100    ) -> Result<(), StorageError> {
2101        let mut conn = self.redis.lock().await;
2102        redis::cmd("SETEX")
2103            .arg(format!("route:{}", entity))
2104            .arg(ttl_seconds.max(1) as usize)
2105            .arg(session_id)
2106            .query_async::<()>(&mut *conn)
2107            .await
2108            .map_err(|_| StorageError::Redis)?;
2109        Ok(())
2110    }
2111
2112    /// Removes a routing entry from Redis.
2113    pub async fn clear_route(&self, entity: &str) -> Result<(), StorageError> {
2114        let mut conn = self.redis.lock().await;
2115        let _: () = redis::cmd("DEL")
2116            .arg(format!("route:{}", entity))
2117            .query_async::<()>(&mut *conn)
2118            .await
2119            .map_err(|_| StorageError::Redis)?;
2120        Ok(())
2121    }
2122}
2123
2124fn generate_pair_code() -> String {
2125    let mut seed = [0u8; PAIRING_CODE_LENGTH];
2126    OsRng.fill_bytes(&mut seed);
2127    let mut output = String::with_capacity(PAIRING_CODE_LENGTH + 1);
2128    for (index, byte) in seed.iter().enumerate() {
2129        let symbol = PAIRING_ALPHABET[(*byte as usize) % PAIRING_ALPHABET.len()] as char;
2130        output.push(symbol);
2131        if index == (PAIRING_CODE_LENGTH / 2) - 1 {
2132            output.push('-');
2133        }
2134    }
2135    if output.ends_with('-') {
2136        output.pop();
2137    }
2138    output
2139}
2140
2141#[cfg(test)]
2142mod tests {
2143    use super::*;
2144    use chrono::{Duration, Utc};
2145    use serde_json::json;
2146    use std::str::FromStr;
2147
2148    #[test]
2149    fn init_sql_exists() {
2150        assert!(INIT_SQL.contains("CREATE TABLE"));
2151    }
2152
2153    #[test]
2154    fn pairing_code_format() {
2155        let code = generate_pair_code();
2156        assert_eq!(code.len(), PAIRING_CODE_LENGTH + 1);
2157        assert!(code.contains('-'));
2158    }
2159
2160    #[test]
2161    fn init_sql_declares_new_relations() {
2162        assert!(INIT_SQL.contains("device_key_event"));
2163        assert!(INIT_SQL.contains("chat_group"));
2164        assert!(INIT_SQL.contains("group_member"));
2165        assert!(INIT_SQL.contains("federation_peer"));
2166        assert!(INIT_SQL.contains("inbox_offset"));
2167    }
2168
2169    #[test]
2170    fn pairing_sql_declares_pairing_table() {
2171        assert!(PAIRING_SQL.contains("device_pairing"));
2172    }
2173
2174    #[test]
2175    fn group_role_roundtrip() {
2176        assert_eq!(GroupRole::Owner.as_str(), "owner");
2177        assert_eq!(GroupRole::from_str("admin").unwrap(), GroupRole::Admin);
2178        assert!(GroupRole::from_str("unknown").is_err());
2179    }
2180
2181    #[test]
2182    fn federation_status_roundtrip() {
2183        assert_eq!(FederationPeerStatus::Active.as_str(), "active");
2184        assert_eq!(
2185            FederationPeerStatus::from_str("pending").unwrap(),
2186            FederationPeerStatus::Pending
2187        );
2188        assert!(FederationPeerStatus::from_str("offline").is_err());
2189    }
2190
2191    #[test]
2192    fn presence_user_payload_emits_aliases() {
2193        let now = Utc::now();
2194        let snapshot = PresenceSnapshot {
2195            entity: "dev-1".to_string(),
2196            state: "online".to_string(),
2197            expires_at: now + Duration::seconds(30),
2198            user_id: Some("user-1".to_string()),
2199            handle: Some("alice".to_string()),
2200            display_name: Some("Alice".to_string()),
2201            avatar_url: None,
2202        };
2203        let payload = presence_user_payload(&snapshot).expect("payload");
2204        assert_eq!(payload["id"], serde_json::json!("user-1"));
2205        assert_eq!(payload["user_id"], serde_json::json!("user-1"));
2206        assert_eq!(payload["handle"], serde_json::json!("alice"));
2207    }
2208
2209    #[test]
2210    fn presence_user_fields_accepts_user_id_alias() {
2211        let mut map = serde_json::Map::new();
2212        map.insert("user_id".to_string(), serde_json::json!("user-99"));
2213        map.insert("handle".to_string(), serde_json::json!("eve"));
2214        let (user_id, handle, display_name, avatar_url) = presence_user_fields(&map);
2215        assert_eq!(user_id.as_deref(), Some("user-99"));
2216        assert_eq!(handle.as_deref(), Some("eve"));
2217        assert!(display_name.is_none());
2218        assert!(avatar_url.is_none());
2219    }
2220
2221    #[tokio::test]
2222    async fn storage_integration_flow() -> Result<(), Box<dyn std::error::Error>> {
2223        let pg = match std::env::var("COMMUCAT_TEST_PG_DSN") {
2224            Ok(value) => value,
2225            Err(_) => {
2226                eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_PG_DSN not set");
2227                return Ok(());
2228            }
2229        };
2230        let redis = match std::env::var("COMMUCAT_TEST_REDIS_URL") {
2231            Ok(value) => value,
2232            Err(_) => {
2233                eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_REDIS_URL not set");
2234                return Ok(());
2235            }
2236        };
2237        let storage = connect(&pg, &redis).await?;
2238        storage.migrate().await?;
2239        let suffix = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2240        let user_profile = NewUserProfile {
2241            user_id: format!("test-user-{}", suffix),
2242            handle: format!("tester{}", suffix),
2243            domain: "local".to_string(),
2244            display_name: Some("Tester".to_string()),
2245            avatar_url: None,
2246        };
2247        let created = storage.create_user(&user_profile).await?;
2248        let device_id = format!("test-device-{}", suffix);
2249        let device_record = DeviceRecord {
2250            device_id: device_id.clone(),
2251            user_id: created.user_id.clone(),
2252            public_key: vec![1; 32],
2253            status: "active".to_string(),
2254            created_at: Utc::now(),
2255        };
2256        storage.upsert_device(&device_record).await?;
2257        let key_event = DeviceKeyEvent {
2258            event_id: format!("evt-{}", suffix),
2259            device_id: device_id.clone(),
2260            public_key: device_record.public_key.clone(),
2261            recorded_at: Utc::now(),
2262        };
2263        storage.record_device_key_event(&key_event).await?;
2264        let latest = storage
2265            .latest_device_key_event(&device_id)
2266            .await?
2267            .expect("expected key event");
2268        assert_eq!(latest.public_key.len(), 32);
2269
2270        let new_key = vec![2u8; 32];
2271        let rotation_id = format!("rot-{}", suffix);
2272        let rotation_event_id = format!("evt-rot-{}", suffix);
2273        let signature = vec![3u8; 64];
2274        let nonce = vec![4u8; 16];
2275        let applied_at = Utc::now();
2276        let proof_expires_at = applied_at + Duration::seconds(120);
2277        let rotation_record = DeviceRotationRecord {
2278            rotation_id: &rotation_id,
2279            device_id: &device_id,
2280            user_id: &created.user_id,
2281            old_public_key: device_record.public_key.as_slice(),
2282            new_public_key: new_key.as_slice(),
2283            signature: signature.as_slice(),
2284            nonce: Some(nonce.as_slice()),
2285            proof_expires_at,
2286            applied_at,
2287            event_id: &rotation_event_id,
2288        };
2289        storage.apply_device_key_rotation(&rotation_record).await?;
2290        let rotated = storage
2291            .latest_device_key_event(&device_id)
2292            .await?
2293            .expect("expected rotation event");
2294        assert_eq!(rotated.event_id, rotation_event_id);
2295        assert_eq!(rotated.public_key, new_key);
2296        let audit_row = storage
2297            .client
2298            .query_one(
2299                "SELECT old_public_key, new_public_key, proof_expires_at FROM device_rotation_audit WHERE rotation_id = $1",
2300                &[&rotation_id],
2301            )
2302            .await?;
2303        let stored_old: Vec<u8> = audit_row.get(0);
2304        let stored_new: Vec<u8> = audit_row.get(1);
2305        let stored_expiry: DateTime<Utc> = audit_row.get(2);
2306        assert_eq!(stored_old, device_record.public_key);
2307        assert_eq!(stored_new, new_key);
2308        assert_eq!(stored_expiry, proof_expires_at);
2309
2310        let group = ChatGroup {
2311            group_id: format!("group-{}", suffix),
2312            owner_device: device_id.clone(),
2313            created_at: Utc::now(),
2314        };
2315        storage.create_group(&group).await?;
2316        let member = GroupMember {
2317            group_id: group.group_id.clone(),
2318            device_id: format!("peer-device-{}", suffix),
2319            role: GroupRole::Member,
2320            joined_at: Utc::now(),
2321        };
2322        storage.add_group_member(&member).await?;
2323        let members = storage.list_group_members(&group.group_id).await?;
2324        assert!(members.iter().any(|m| m.device_id == device_id));
2325        assert!(members.iter().any(|m| m.device_id == member.device_id));
2326        let memberships = storage.list_groups_for_device(&member.device_id).await?;
2327        assert_eq!(memberships.len(), 1);
2328        storage
2329            .remove_group_member(&group.group_id, &member.device_id)
2330            .await?;
2331
2332        let peer = FederationPeerRecord {
2333            domain: format!("peer{}.example", suffix),
2334            endpoint: "https://peer.example/federation".to_string(),
2335            public_key: [5u8; 32],
2336            status: FederationPeerStatus::Active,
2337            updated_at: Utc::now(),
2338        };
2339        storage.upsert_federation_peer(&peer).await?;
2340        let fetched = storage.load_federation_peer(&peer.domain).await?;
2341        assert_eq!(fetched.endpoint, peer.endpoint);
2342
2343        let event_json = serde_json::json!({
2344            "event": {
2345                "event_id": format!("fed-{}", suffix),
2346                "origin": "remote.example",
2347                "created_at": Utc::now().to_rfc3339(),
2348                "payload": serde_json::json!({"channel": 1, "payload": "00"}),
2349                "scope": "relay",
2350            },
2351            "signature": vec![0u8; 64],
2352            "digest": vec![0u8; 32],
2353        });
2354        let outbox_id = format!("outbox-{}", suffix);
2355        let outbox_insert = FederationOutboxInsert {
2356            outbox_id: &outbox_id,
2357            destination: &peer.domain,
2358            endpoint: &peer.endpoint,
2359            payload: &event_json,
2360            public_key: &peer.public_key,
2361            next_attempt_at: Utc::now(),
2362        };
2363        storage.enqueue_federation_outbox(&outbox_insert).await?;
2364        let claimed = storage
2365            .claim_federation_outbox(8, Duration::seconds(30), Utc::now())
2366            .await?;
2367        assert_eq!(claimed.len(), 1);
2368        assert_eq!(claimed[0].outbox_id, outbox_id);
2369        storage
2370            .reschedule_federation_outbox(
2371                &outbox_id,
2372                Duration::seconds(5),
2373                Utc::now(),
2374                Some("boom"),
2375            )
2376            .await?;
2377        let after_delay = storage
2378            .claim_federation_outbox(8, Duration::seconds(30), Utc::now() + Duration::seconds(6))
2379            .await?;
2380        assert_eq!(after_delay.len(), 1);
2381        assert_eq!(after_delay[0].outbox_id, outbox_id);
2382        storage.delete_federation_outbox(&outbox_id).await?;
2383
2384        let offset = InboxOffset {
2385            entity_id: device_id.clone(),
2386            channel_id: format!("inbox:{}", device_id),
2387            last_envelope_id: Some(format!("env-{}", suffix)),
2388            updated_at: Utc::now(),
2389        };
2390        storage.store_inbox_offset(&offset).await?;
2391        let loaded = storage
2392            .read_inbox_offset(&offset.entity_id, &offset.channel_id)
2393            .await?
2394            .expect("offset present");
2395        assert_eq!(loaded.last_envelope_id, offset.last_envelope_id);
2396
2397        let ticket = storage
2398            .create_pairing_token(&created.user_id, &device_id, 300)
2399            .await?;
2400        assert_eq!(ticket.pair_code.len(), 9);
2401        let paired_device = format!("paired-device-{}", suffix);
2402        let claim = storage
2403            .claim_pairing_token(&ticket.pair_code, &paired_device, &[7u8; 32])
2404            .await?;
2405        assert_eq!(claim.user.user_id, created.user_id);
2406        assert_eq!(claim.issuer_device_id, device_id);
2407        storage
2408            .client
2409            .execute(
2410                "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at, attempts) VALUES ($1, $2, $3, now(), now() - interval '10 minutes', 0)",
2411                &[&format!("expired-{}", suffix), &created.user_id, &device_id],
2412            )
2413            .await
2414            .map_err(|_| StorageError::Postgres)?;
2415        let purged = storage.invalidate_expired_pairings().await?;
2416        assert!(purged >= 1);
2417        storage
2418            .write_user_blob(&created.user_id, "friends", "[]")
2419            .await?;
2420        let blob = storage.read_user_blob(&created.user_id, "friends").await?;
2421        assert_eq!(blob.as_deref(), Some("[]"));
2422
2423        let secret_name = format!("noise-static-{}", suffix);
2424        let now = Utc::now();
2425        let secret_record = ServerSecretRecord {
2426            name: secret_name.clone(),
2427            version: 1,
2428            secret: vec![9u8; 32],
2429            public: Some(vec![8u8; 32]),
2430            metadata: json!({"purpose": "test"}),
2431            created_at: now,
2432            valid_after: now,
2433            rotates_at: now + Duration::hours(1),
2434            expires_at: now + Duration::hours(2),
2435        };
2436        storage.insert_server_secret(&secret_record).await?;
2437        let latest = storage.latest_server_secret_version(&secret_name).await?;
2438        assert_eq!(latest, 1);
2439        let active = storage
2440            .active_server_secrets(&secret_name, now + Duration::minutes(30))
2441            .await?;
2442        assert_eq!(active.len(), 1);
2443        assert_eq!(active[0].public.as_ref().map(|v| v.len()), Some(32));
2444        assert_eq!(active[0].metadata["purpose"], json!("test"));
2445        let removed = storage
2446            .delete_expired_server_secrets(&secret_name, now + Duration::hours(3))
2447            .await?;
2448        assert_eq!(removed, 1);
2449
2450        let idempotency_key = IdempotencyKey {
2451            key: format!("key-{}", suffix),
2452            scope: "federation-test".to_string(),
2453            created_at: Utc::now(),
2454        };
2455        assert!(storage.store_idempotency(&idempotency_key).await?);
2456        assert!(!storage.store_idempotency(&idempotency_key).await?);
2457
2458        Ok(())
2459    }
2460}