commucat_storage/
lib.rs

1use chrono::{DateTime, Duration, Utc};
2use rand::{rngs::OsRng, RngCore};
3use serde_json::Value;
4use std::convert::TryInto;
5use std::error::Error;
6use std::fmt::{Display, Formatter};
7use std::str::FromStr;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11use tokio_postgres::{Client, NoTls};
12
13const INIT_SQL: &str = include_str!("../migrations/001_init.sql");
14const PAIRING_SQL: &str = include_str!("../migrations/002_pairing.sql");
15const PAIRING_MAX_ATTEMPTS: i32 = 5;
16const PAIRING_CODE_LENGTH: usize = 8;
17const PAIRING_ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
18
19#[derive(Debug)]
20pub enum StorageError {
21    Postgres,
22    Redis,
23    Serialization,
24    Missing,
25    Invalid,
26}
27
28impl Display for StorageError {
29    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30        match self {
31            Self::Postgres => write!(f, "postgres failure"),
32            Self::Redis => write!(f, "redis failure"),
33            Self::Serialization => write!(f, "serialization failure"),
34            Self::Missing => write!(f, "missing record"),
35            Self::Invalid => write!(f, "invalid state"),
36        }
37    }
38}
39
40impl Error for StorageError {}
41
42pub struct Storage {
43    client: Client,
44    _pg_task: JoinHandle<()>,
45    redis: Arc<Mutex<redis::aio::MultiplexedConnection>>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct NewUserProfile {
50    pub user_id: String,
51    pub handle: String,
52    pub display_name: Option<String>,
53    pub avatar_url: Option<String>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct UserProfile {
58    pub user_id: String,
59    pub handle: String,
60    pub display_name: Option<String>,
61    pub avatar_url: Option<String>,
62    pub created_at: DateTime<Utc>,
63    pub updated_at: DateTime<Utc>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct DeviceRecord {
68    pub device_id: String,
69    pub user_id: String,
70    pub public_key: Vec<u8>,
71    pub status: String,
72    pub created_at: DateTime<Utc>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct SessionRecord {
77    pub session_id: String,
78    pub user_id: String,
79    pub device_id: String,
80    pub tls_fingerprint: String,
81    pub created_at: DateTime<Utc>,
82    pub ttl_seconds: i64,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct RelayEnvelope {
87    pub envelope_id: String,
88    pub channel_id: String,
89    pub payload: Vec<u8>,
90    pub deliver_after: DateTime<Utc>,
91    pub expires_at: DateTime<Utc>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct IdempotencyKey {
96    pub key: String,
97    pub scope: String,
98    pub created_at: DateTime<Utc>,
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct PresenceSnapshot {
103    pub entity: String,
104    pub state: String,
105    pub expires_at: DateTime<Utc>,
106    pub user_id: Option<String>,
107    pub handle: Option<String>,
108    pub display_name: Option<String>,
109    pub avatar_url: Option<String>,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub struct DeviceKeyEvent {
114    pub event_id: String,
115    pub device_id: String,
116    pub public_key: Vec<u8>,
117    pub recorded_at: DateTime<Utc>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct PairingTokenIssued {
122    pub pair_code: String,
123    pub issued_at: DateTime<Utc>,
124    pub expires_at: DateTime<Utc>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct PairingClaimResult {
129    pub user: UserProfile,
130    pub issuer_device_id: String,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct ChatGroup {
135    pub group_id: String,
136    pub owner_device: String,
137    pub created_at: DateTime<Utc>,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub enum GroupRole {
142    Owner,
143    Admin,
144    Member,
145}
146
147impl GroupRole {
148    fn as_str(&self) -> &'static str {
149        match self {
150            GroupRole::Owner => "owner",
151            GroupRole::Admin => "admin",
152            GroupRole::Member => "member",
153        }
154    }
155}
156
157impl FromStr for GroupRole {
158    type Err = StorageError;
159
160    fn from_str(value: &str) -> Result<Self, Self::Err> {
161        match value {
162            "owner" => Ok(GroupRole::Owner),
163            "admin" => Ok(GroupRole::Admin),
164            "member" => Ok(GroupRole::Member),
165            _ => Err(StorageError::Serialization),
166        }
167    }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct GroupMember {
172    pub group_id: String,
173    pub device_id: String,
174    pub role: GroupRole,
175    pub joined_at: DateTime<Utc>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum FederationPeerStatus {
180    Active,
181    Pending,
182    Blocked,
183}
184
185impl FederationPeerStatus {
186    fn as_str(&self) -> &'static str {
187        match self {
188            FederationPeerStatus::Active => "active",
189            FederationPeerStatus::Pending => "pending",
190            FederationPeerStatus::Blocked => "blocked",
191        }
192    }
193}
194
195impl FromStr for FederationPeerStatus {
196    type Err = StorageError;
197
198    fn from_str(value: &str) -> Result<Self, Self::Err> {
199        match value {
200            "active" => Ok(FederationPeerStatus::Active),
201            "pending" => Ok(FederationPeerStatus::Pending),
202            "blocked" => Ok(FederationPeerStatus::Blocked),
203            _ => Err(StorageError::Serialization),
204        }
205    }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct FederationPeerRecord {
210    pub domain: String,
211    pub endpoint: String,
212    pub public_key: [u8; 32],
213    pub status: FederationPeerStatus,
214    pub updated_at: DateTime<Utc>,
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub struct InboxOffset {
219    pub entity_id: String,
220    pub channel_id: String,
221    pub last_envelope_id: Option<String>,
222    pub updated_at: DateTime<Utc>,
223}
224
225/// Establishes connectivity to PostgreSQL and Redis backends.
226pub async fn connect(postgres_dsn: &str, redis_url: &str) -> Result<Storage, StorageError> {
227    let (client, connection) = tokio_postgres::connect(postgres_dsn, NoTls)
228        .await
229        .map_err(|_| StorageError::Postgres)?;
230    let task = tokio::spawn(async move {
231        if let Err(error) = connection.await {
232            tracing::error!("postgres connection stopped: {}", error);
233        }
234    });
235    let redis_client = redis::Client::open(redis_url).map_err(|_| StorageError::Redis)?;
236    let redis_connection = redis_client
237        .get_multiplexed_async_connection()
238        .await
239        .map_err(|_| StorageError::Redis)?;
240    Ok(Storage {
241        client,
242        _pg_task: task,
243        redis: Arc::new(Mutex::new(redis_connection)),
244    })
245}
246
247impl Storage {
248    /// Applies bundled migrations to PostgreSQL.
249    pub async fn migrate(&self) -> Result<(), StorageError> {
250        self.client
251            .batch_execute(INIT_SQL)
252            .await
253            .map_err(|_| StorageError::Postgres)?;
254        self.client
255            .batch_execute(PAIRING_SQL)
256            .await
257            .map_err(|_| StorageError::Postgres)
258    }
259
260    /// Executes lightweight probes across PostgreSQL and Redis.
261    pub async fn readiness(&self) -> Result<(), StorageError> {
262        self.client
263            .simple_query("SELECT 1")
264            .await
265            .map_err(|_| StorageError::Postgres)?;
266        let mut conn = self.redis.lock().await;
267        let _: String = redis::cmd("PING")
268            .query_async::<_, String>(&mut *conn)
269            .await
270            .map_err(|_| StorageError::Redis)?;
271        Ok(())
272    }
273
274    /// Creates a short-lived pairing code bound to an issuer device.
275    pub async fn create_pairing_token(
276        &self,
277        user_id: &str,
278        issuer_device_id: &str,
279        ttl_seconds: i64,
280    ) -> Result<PairingTokenIssued, StorageError> {
281        let issuer = self.load_device(issuer_device_id).await?;
282        if issuer.user_id != user_id || issuer.status != "active" {
283            return Err(StorageError::Invalid);
284        }
285        let ttl = ttl_seconds.clamp(60, 3600);
286        let issued_at = Utc::now();
287        let expires_at = issued_at + Duration::seconds(ttl);
288        for _ in 0..16 {
289            let pair_code = generate_pair_code();
290            let inserted = self
291                .client
292                .execute(
293                    "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at)
294                    VALUES ($1, $2, $3, $4, $5)
295                    ON CONFLICT (pair_code) DO NOTHING",
296                    &[&pair_code, &user_id, &issuer_device_id, &issued_at, &expires_at],
297                )
298                .await
299                .map_err(|_| StorageError::Postgres)?;
300            if inserted == 1 {
301                return Ok(PairingTokenIssued {
302                    pair_code,
303                    issued_at,
304                    expires_at,
305                });
306            }
307        }
308        Err(StorageError::Postgres)
309    }
310
311    /// Registers or rotates a device key.
312    pub async fn upsert_device(&self, record: &DeviceRecord) -> Result<(), StorageError> {
313        let query = "INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at) VALUES ($1, $2, $3, $4, $5)
314            ON CONFLICT (opaque_id) DO UPDATE SET pubkey = excluded.pubkey, status = excluded.status
315            WHERE user_device.user_id = excluded.user_id";
316        self.client
317            .execute(
318                query,
319                &[
320                    &record.device_id,
321                    &record.user_id,
322                    &record.public_key,
323                    &record.status,
324                    &record.created_at,
325                ],
326            )
327            .await
328            .map_err(|_| StorageError::Postgres)?;
329        Ok(())
330    }
331
332    /// Persists an audit trail entry for device key material.
333    pub async fn record_device_key_event(
334        &self,
335        event: &DeviceKeyEvent,
336    ) -> Result<(), StorageError> {
337        let query = "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)";
338        self.client
339            .execute(
340                query,
341                &[
342                    &event.event_id,
343                    &event.device_id,
344                    &event.public_key,
345                    &event.recorded_at,
346                ],
347            )
348            .await
349            .map_err(|_| StorageError::Postgres)?;
350        Ok(())
351    }
352
353    /// Claims a pairing token and registers a new device for the associated user.
354    pub async fn claim_pairing_token(
355        &self,
356        pair_code: &str,
357        device_id: &str,
358        public_key: &[u8],
359    ) -> Result<PairingClaimResult, StorageError> {
360        let recorded_at = Utc::now();
361        let event_id = format!(
362            "pair:{}:{}",
363            device_id,
364            recorded_at.timestamp_nanos_opt().unwrap_or_default()
365        );
366        let stmt = "WITH selected AS (
367                SELECT user_id, issuer_device_id, expires_at, redeemed_at, attempts
368                FROM device_pairing
369                WHERE pair_code = $1
370                FOR UPDATE
371            ),
372            validated AS (
373                SELECT user_id, issuer_device_id
374                FROM selected
375                WHERE expires_at > now()
376                  AND redeemed_at IS NULL
377                  AND attempts < $6
378            ),
379            updated AS (
380                UPDATE device_pairing
381                SET redeemed_at = $5,
382                    redeemed_device_id = $2,
383                    public_key = $3,
384                    attempts = LEAST(attempts + 1, $6)
385                WHERE pair_code = $1
386                  AND EXISTS (SELECT 1 FROM validated)
387                RETURNING user_id, issuer_device_id
388            ),
389            inserted AS (
390                INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at)
391                SELECT $2, user_id, $3, 'active', $5 FROM validated
392                RETURNING user_id
393            ),
394            events AS (
395                INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at)
396                SELECT $4, $2, $3, $5 FROM inserted
397            )
398            SELECT user_id, issuer_device_id FROM updated";
399        let result = self
400            .client
401            .query_opt(
402                stmt,
403                &[
404                    &pair_code,
405                    &device_id,
406                    &public_key,
407                    &event_id,
408                    &recorded_at,
409                    &PAIRING_MAX_ATTEMPTS,
410                ],
411            )
412            .await
413            .map_err(|_| StorageError::Postgres)?;
414        let (user_id, issuer_device_id) = match result {
415            Some(row) => {
416                let user_id: String = row.get(0);
417                let issuer_device_id: String = row.get(1);
418                (user_id, issuer_device_id)
419            }
420            None => {
421                let exists = self
422                    .client
423                    .query_opt(
424                        "SELECT 1 FROM device_pairing WHERE pair_code = $1",
425                        &[&pair_code],
426                    )
427                    .await
428                    .map_err(|_| StorageError::Postgres)?;
429                if exists.is_some() {
430                    self
431                        .client
432                        .execute(
433                            "UPDATE device_pairing SET attempts = LEAST(attempts + 1, $2) WHERE pair_code = $1",
434                            &[&pair_code, &PAIRING_MAX_ATTEMPTS],
435                        )
436                        .await
437                        .map_err(|_| StorageError::Postgres)?;
438                    return Err(StorageError::Invalid);
439                }
440                return Err(StorageError::Missing);
441            }
442        };
443        let profile = self.load_user(&user_id).await?;
444        Ok(PairingClaimResult {
445            user: profile,
446            issuer_device_id,
447        })
448    }
449
450    /// Removes expired or exhausted pairing tokens.
451    pub async fn invalidate_expired_pairings(&self) -> Result<u64, StorageError> {
452        let affected = self
453            .client
454            .execute(
455                "DELETE FROM device_pairing WHERE expires_at <= now() OR attempts >= $1 OR (redeemed_at IS NOT NULL AND redeemed_at <= now() - interval '1 day')",
456                &[&PAIRING_MAX_ATTEMPTS],
457            )
458            .await
459            .map_err(|_| StorageError::Postgres)?;
460        Ok(affected)
461    }
462
463    /// Fetches the newest device key event for a device identifier.
464    pub async fn latest_device_key_event(
465        &self,
466        device_id: &str,
467    ) -> Result<Option<DeviceKeyEvent>, StorageError> {
468        let query = "SELECT event_id, device_id, public_key, recorded_at
469            FROM device_key_event WHERE device_id = $1 ORDER BY recorded_at DESC LIMIT 1";
470        let row = self
471            .client
472            .query_opt(query, &[&device_id])
473            .await
474            .map_err(|_| StorageError::Postgres)?;
475        Ok(row.map(|row| DeviceKeyEvent {
476            event_id: row.get(0),
477            device_id: row.get(1),
478            public_key: row.get(2),
479            recorded_at: row.get(3),
480        }))
481    }
482
483    /// Creates a session binding a device to a TLS fingerprint.
484    pub async fn record_session(&self, session: &SessionRecord) -> Result<(), StorageError> {
485        let query =
486            "INSERT INTO session (opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds)
487            VALUES ($1, $2, $3, $4, $5, $6)";
488        self.client
489            .execute(
490                query,
491                &[
492                    &session.session_id,
493                    &session.user_id,
494                    &session.device_id,
495                    &session.tls_fingerprint,
496                    &session.created_at,
497                    &session.ttl_seconds,
498                ],
499            )
500            .await
501            .map_err(|_| StorageError::Postgres)?;
502        Ok(())
503    }
504
505    /// Loads a persisted session by identifier.
506    pub async fn load_session(&self, session_id: &str) -> Result<SessionRecord, StorageError> {
507        let row = self
508            .client
509            .query_opt(
510                "SELECT opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds FROM session WHERE opaque_id = $1",
511                &[&session_id],
512            )
513            .await
514            .map_err(|_| StorageError::Postgres)?;
515        let row = row.ok_or(StorageError::Missing)?;
516        Ok(SessionRecord {
517            session_id: row.get(0),
518            user_id: row.get(1),
519            device_id: row.get(2),
520            tls_fingerprint: row.get(3),
521            created_at: row.get(4),
522            ttl_seconds: row.get(5),
523        })
524    }
525
526    /// Fetches device metadata by identifier.
527    pub async fn load_device(&self, device_id: &str) -> Result<DeviceRecord, StorageError> {
528        let row = self
529            .client
530            .query_opt(
531                "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE opaque_id = $1",
532                &[&device_id],
533            )
534            .await
535            .map_err(|_| StorageError::Postgres)?;
536        let row = row.ok_or(StorageError::Missing)?;
537        Ok(DeviceRecord {
538            device_id: row.get(0),
539            user_id: row.get(1),
540            public_key: row.get(2),
541            status: row.get(3),
542            created_at: row.get(4),
543        })
544    }
545
546    /// Counts active devices registered for a user.
547    pub async fn count_active_devices(&self, user_id: &str) -> Result<i64, StorageError> {
548        let row = self
549            .client
550            .query_one(
551                "SELECT COUNT(*) FROM user_device WHERE user_id = $1 AND status = 'active'",
552                &[&user_id],
553            )
554            .await
555            .map_err(|_| StorageError::Postgres)?;
556        Ok(row.get(0))
557    }
558
559    /// Lists devices associated with a user ordered by creation time.
560    pub async fn list_devices_for_user(
561        &self,
562        user_id: &str,
563    ) -> Result<Vec<DeviceRecord>, StorageError> {
564        let rows = self
565            .client
566            .query(
567                "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE user_id = $1 ORDER BY created_at ASC",
568                &[&user_id],
569            )
570            .await
571            .map_err(|_| StorageError::Postgres)?;
572        Ok(rows
573            .into_iter()
574            .map(|row| DeviceRecord {
575                device_id: row.get(0),
576                user_id: row.get(1),
577                public_key: row.get(2),
578                status: row.get(3),
579                created_at: row.get(4),
580            })
581            .collect())
582    }
583
584    /// Marks a device as active.
585    pub async fn activate_device(&self, device_id: &str) -> Result<(), StorageError> {
586        self.update_device_status(device_id, "active").await
587    }
588
589    /// Marks a device as revoked.
590    pub async fn deactivate_device(&self, device_id: &str) -> Result<(), StorageError> {
591        self.update_device_status(device_id, "revoked").await
592    }
593
594    async fn update_device_status(
595        &self,
596        device_id: &str,
597        status: &str,
598    ) -> Result<(), StorageError> {
599        let affected = self
600            .client
601            .execute(
602                "UPDATE user_device SET status = $2 WHERE opaque_id = $1",
603                &[&device_id, &status],
604            )
605            .await
606            .map_err(|_| StorageError::Postgres)?;
607        if affected == 0 {
608            return Err(StorageError::Missing);
609        }
610        Ok(())
611    }
612
613    /// Creates a new user profile entry.
614    pub async fn create_user(&self, profile: &NewUserProfile) -> Result<UserProfile, StorageError> {
615        let now = Utc::now();
616        let row = self
617            .client
618            .query_one(
619                "INSERT INTO app_user (user_id, handle, display_name, avatar_url, created_at, updated_at)
620                VALUES ($1, $2, $3, $4, $5, $5)
621                RETURNING user_id, handle, display_name, avatar_url, created_at, updated_at",
622                &[
623                    &profile.user_id,
624                    &profile.handle,
625                    &profile.display_name,
626                    &profile.avatar_url,
627                    &now,
628                ],
629            )
630            .await
631            .map_err(|_| StorageError::Postgres)?;
632        Ok(UserProfile {
633            user_id: row.get(0),
634            handle: row.get(1),
635            display_name: row.get(2),
636            avatar_url: row.get(3),
637            created_at: row.get(4),
638            updated_at: row.get(5),
639        })
640    }
641
642    /// Loads a user profile by identifier.
643    pub async fn load_user(&self, user_id: &str) -> Result<UserProfile, StorageError> {
644        let row = self
645            .client
646            .query_opt(
647                "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE user_id = $1",
648                &[&user_id],
649            )
650            .await
651            .map_err(|_| StorageError::Postgres)?;
652        let row = row.ok_or(StorageError::Missing)?;
653        Ok(UserProfile {
654            user_id: row.get(0),
655            handle: row.get(1),
656            display_name: row.get(2),
657            avatar_url: row.get(3),
658            created_at: row.get(4),
659            updated_at: row.get(5),
660        })
661    }
662
663    /// Loads a user profile by handle.
664    pub async fn load_user_by_handle(&self, handle: &str) -> Result<UserProfile, StorageError> {
665        let row = self
666            .client
667            .query_opt(
668                "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE handle = $1",
669                &[&handle],
670            )
671            .await
672            .map_err(|_| StorageError::Postgres)?;
673        let row = row.ok_or(StorageError::Missing)?;
674        Ok(UserProfile {
675            user_id: row.get(0),
676            handle: row.get(1),
677            display_name: row.get(2),
678            avatar_url: row.get(3),
679            created_at: row.get(4),
680            updated_at: row.get(5),
681        })
682    }
683
684    /// Applies partial updates to user profile metadata.
685    pub async fn update_user_profile(
686        &self,
687        user_id: &str,
688        display_name: Option<&str>,
689        avatar_url: Option<&str>,
690    ) -> Result<(), StorageError> {
691        let now = Utc::now();
692        let affected = self
693            .client
694            .execute(
695                "UPDATE app_user SET display_name = COALESCE($2, display_name), avatar_url = COALESCE($3, avatar_url), updated_at = $4 WHERE user_id = $1",
696                &[&user_id, &display_name, &avatar_url, &now],
697            )
698            .await
699            .map_err(|_| StorageError::Postgres)?;
700        if affected == 0 {
701            return Err(StorageError::Missing);
702        }
703        Ok(())
704    }
705
706    /// Creates a chat group entry and enrolls the owner as a member.
707    pub async fn create_group(&self, group: &ChatGroup) -> Result<(), StorageError> {
708        self.client
709            .execute(
710                "INSERT INTO chat_group (group_id, owner_device, created_at) VALUES ($1, $2, $3)
711                ON CONFLICT (group_id) DO NOTHING",
712                &[&group.group_id, &group.owner_device, &group.created_at],
713            )
714            .await
715            .map_err(|_| StorageError::Postgres)?;
716        self.client
717            .execute(
718                "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
719                ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role",
720                &[
721                    &group.group_id,
722                    &group.owner_device,
723                    &GroupRole::Owner.as_str(),
724                    &group.created_at,
725                ],
726            )
727            .await
728            .map_err(|_| StorageError::Postgres)?;
729        Ok(())
730    }
731
732    /// Adds or updates group membership information.
733    pub async fn add_group_member(&self, member: &GroupMember) -> Result<(), StorageError> {
734        let query = "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
735            ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role, joined_at = excluded.joined_at";
736        self.client
737            .execute(
738                query,
739                &[
740                    &member.group_id,
741                    &member.device_id,
742                    &member.role.as_str(),
743                    &member.joined_at,
744                ],
745            )
746            .await
747            .map_err(|_| StorageError::Postgres)?;
748        Ok(())
749    }
750
751    /// Removes a member from the given group.
752    pub async fn remove_group_member(
753        &self,
754        group_id: &str,
755        device_id: &str,
756    ) -> Result<(), StorageError> {
757        let affected = self
758            .client
759            .execute(
760                "DELETE FROM group_member WHERE group_id = $1 AND device_id = $2",
761                &[&group_id, &device_id],
762            )
763            .await
764            .map_err(|_| StorageError::Postgres)?;
765        if affected == 0 {
766            return Err(StorageError::Missing);
767        }
768        Ok(())
769    }
770
771    /// Lists all members of a group ordered by join time.
772    pub async fn list_group_members(
773        &self,
774        group_id: &str,
775    ) -> Result<Vec<GroupMember>, StorageError> {
776        let rows = self
777            .client
778            .query(
779                "SELECT group_id, device_id, role, joined_at FROM group_member WHERE group_id = $1 ORDER BY joined_at ASC",
780                &[&group_id],
781            )
782            .await
783            .map_err(|_| StorageError::Postgres)?;
784        let mut members = Vec::with_capacity(rows.len());
785        for row in rows {
786            let role: String = row.get(2);
787            let parsed = GroupRole::from_str(role.as_str())?;
788            members.push(GroupMember {
789                group_id: row.get(0),
790                device_id: row.get(1),
791                role: parsed,
792                joined_at: row.get(3),
793            });
794        }
795        Ok(members)
796    }
797
798    /// Loads group metadata by identifier.
799    pub async fn load_group(&self, group_id: &str) -> Result<ChatGroup, StorageError> {
800        let row = self
801            .client
802            .query_opt(
803                "SELECT group_id, owner_device, created_at FROM chat_group WHERE group_id = $1",
804                &[&group_id],
805            )
806            .await
807            .map_err(|_| StorageError::Postgres)?;
808        let row = row.ok_or(StorageError::Missing)?;
809        Ok(ChatGroup {
810            group_id: row.get(0),
811            owner_device: row.get(1),
812            created_at: row.get(2),
813        })
814    }
815
816    /// Lists groups that include the target device.
817    pub async fn list_groups_for_device(
818        &self,
819        device_id: &str,
820    ) -> Result<Vec<ChatGroup>, StorageError> {
821        let rows = self
822            .client
823            .query(
824                "SELECT g.group_id, g.owner_device, g.created_at FROM chat_group g
825                INNER JOIN group_member m ON g.group_id = m.group_id
826                WHERE m.device_id = $1 ORDER BY g.created_at ASC",
827                &[&device_id],
828            )
829            .await
830            .map_err(|_| StorageError::Postgres)?;
831        Ok(rows
832            .into_iter()
833            .map(|row| ChatGroup {
834                group_id: row.get(0),
835                owner_device: row.get(1),
836                created_at: row.get(2),
837            })
838            .collect())
839    }
840
841    /// Upserts federation peer descriptors for S2S routing.
842    pub async fn upsert_federation_peer(
843        &self,
844        peer: &FederationPeerRecord,
845    ) -> Result<(), StorageError> {
846        let query = "INSERT INTO federation_peer (domain, endpoint, public_key, status, updated_at)
847            VALUES ($1, $2, $3, $4, $5)
848            ON CONFLICT (domain) DO UPDATE SET endpoint = excluded.endpoint, public_key = excluded.public_key, status = excluded.status, updated_at = excluded.updated_at";
849        self.client
850            .execute(
851                query,
852                &[
853                    &peer.domain,
854                    &peer.endpoint,
855                    &peer.public_key.as_slice(),
856                    &peer.status.as_str(),
857                    &peer.updated_at,
858                ],
859            )
860            .await
861            .map_err(|_| StorageError::Postgres)?;
862        Ok(())
863    }
864
865    /// Loads a federation peer by domain.
866    pub async fn load_federation_peer(
867        &self,
868        domain: &str,
869    ) -> Result<FederationPeerRecord, StorageError> {
870        let row = self
871            .client
872            .query_opt(
873                "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer WHERE domain = $1",
874                &[&domain],
875            )
876            .await
877            .map_err(|_| StorageError::Postgres)?;
878        let row = row.ok_or(StorageError::Missing)?;
879        let key: Vec<u8> = row.get(2);
880        let status: String = row.get(3);
881        let status = FederationPeerStatus::from_str(status.as_str())?;
882        let public_key: [u8; 32] = key
883            .as_slice()
884            .try_into()
885            .map_err(|_| StorageError::Serialization)?;
886        Ok(FederationPeerRecord {
887            domain: row.get(0),
888            endpoint: row.get(1),
889            public_key,
890            status,
891            updated_at: row.get(4),
892        })
893    }
894
895    /// Enumerates all known federation peers.
896    pub async fn list_federation_peers(&self) -> Result<Vec<FederationPeerRecord>, StorageError> {
897        let rows = self
898            .client
899            .query(
900                "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer",
901                &[],
902            )
903            .await
904            .map_err(|_| StorageError::Postgres)?;
905        let mut peers = Vec::with_capacity(rows.len());
906        for row in rows {
907            let key: Vec<u8> = row.get(2);
908            let status: String = row.get(3);
909            let status = FederationPeerStatus::from_str(status.as_str())?;
910            let public_key: [u8; 32] = key
911                .as_slice()
912                .try_into()
913                .map_err(|_| StorageError::Serialization)?;
914            peers.push(FederationPeerRecord {
915                domain: row.get(0),
916                endpoint: row.get(1),
917                public_key,
918                status,
919                updated_at: row.get(4),
920            });
921        }
922        Ok(peers)
923    }
924
925    /// Sets the peer status and refresh timestamp.
926    pub async fn set_federation_peer_status(
927        &self,
928        domain: &str,
929        status: FederationPeerStatus,
930    ) -> Result<(), StorageError> {
931        let now = Utc::now();
932        let affected = self
933            .client
934            .execute(
935                "UPDATE federation_peer SET status = $2, updated_at = $3 WHERE domain = $1",
936                &[&domain, &status.as_str(), &now],
937            )
938            .await
939            .map_err(|_| StorageError::Postgres)?;
940        if affected == 0 {
941            return Err(StorageError::Missing);
942        }
943        Ok(())
944    }
945
946    /// Schedules an encrypted relay envelope for delivery.
947    pub async fn enqueue_relay(&self, envelope: &RelayEnvelope) -> Result<(), StorageError> {
948        let query =
949            "INSERT INTO relay_queue (envelope_id, channel_id, payload, deliver_after, expires_at)
950            VALUES ($1, $2, $3, $4, $5)";
951        self.client
952            .execute(
953                query,
954                &[
955                    &envelope.envelope_id,
956                    &envelope.channel_id,
957                    &envelope.payload,
958                    &envelope.deliver_after,
959                    &envelope.expires_at,
960                ],
961            )
962            .await
963            .map_err(|_| StorageError::Postgres)?;
964        Ok(())
965    }
966
967    /// Claims pending relay envelopes for a channel.
968    pub async fn claim_envelopes(
969        &self,
970        channel_id: &str,
971        limit: i64,
972    ) -> Result<Vec<RelayEnvelope>, StorageError> {
973        let query = "DELETE FROM relay_queue
974            WHERE envelope_id IN (
975                SELECT envelope_id FROM relay_queue
976                WHERE channel_id = $1 AND deliver_after <= now()
977                ORDER BY deliver_after ASC
978                LIMIT $2
979            )
980            RETURNING envelope_id, channel_id, payload, deliver_after, expires_at";
981        let rows = self
982            .client
983            .query(query, &[&channel_id, &limit])
984            .await
985            .map_err(|_| StorageError::Postgres)?;
986        Ok(rows
987            .into_iter()
988            .map(|row| RelayEnvelope {
989                envelope_id: row.get(0),
990                channel_id: row.get(1),
991                payload: row.get(2),
992                deliver_after: row.get(3),
993                expires_at: row.get(4),
994            })
995            .collect())
996    }
997
998    /// Stores the last delivered envelope reference for an entity/channel pair.
999    pub async fn store_inbox_offset(&self, offset: &InboxOffset) -> Result<(), StorageError> {
1000        let query = "INSERT INTO inbox_offset (entity_id, channel_id, last_envelope_id, updated_at)
1001            VALUES ($1, $2, $3, $4)
1002            ON CONFLICT (entity_id, channel_id) DO UPDATE SET last_envelope_id = excluded.last_envelope_id, updated_at = excluded.updated_at";
1003        self.client
1004            .execute(
1005                query,
1006                &[
1007                    &offset.entity_id,
1008                    &offset.channel_id,
1009                    &offset.last_envelope_id,
1010                    &offset.updated_at,
1011                ],
1012            )
1013            .await
1014            .map_err(|_| StorageError::Postgres)?;
1015        Ok(())
1016    }
1017
1018    /// Reads the stored inbox offset if present.
1019    pub async fn read_inbox_offset(
1020        &self,
1021        entity_id: &str,
1022        channel_id: &str,
1023    ) -> Result<Option<InboxOffset>, StorageError> {
1024        let row = self
1025            .client
1026            .query_opt(
1027                "SELECT entity_id, channel_id, last_envelope_id, updated_at FROM inbox_offset WHERE entity_id = $1 AND channel_id = $2",
1028                &[&entity_id, &channel_id],
1029            )
1030            .await
1031            .map_err(|_| StorageError::Postgres)?;
1032        Ok(row.map(|row| InboxOffset {
1033            entity_id: row.get(0),
1034            channel_id: row.get(1),
1035            last_envelope_id: row.get(2),
1036            updated_at: row.get(3),
1037        }))
1038    }
1039
1040    /// Records an idempotency key for deduplication.
1041    pub async fn store_idempotency(&self, key: &IdempotencyKey) -> Result<(), StorageError> {
1042        let query = "INSERT INTO idempotency (key, scope, created_at) VALUES ($1, $2, $3)
1043            ON CONFLICT (key, scope) DO NOTHING";
1044        self.client
1045            .execute(query, &[&key.key, &key.scope, &key.created_at])
1046            .await
1047            .map_err(|_| StorageError::Postgres)?;
1048        Ok(())
1049    }
1050
1051    /// Publishes local presence information into Redis.
1052    pub async fn publish_presence(&self, snapshot: &PresenceSnapshot) -> Result<(), StorageError> {
1053        let mut conn = self.redis.lock().await;
1054        let ttl = (snapshot.expires_at.timestamp() - Utc::now().timestamp()).max(1) as usize;
1055        let payload = serde_json::json!({
1056            "entity": snapshot.entity,
1057            "state": snapshot.state,
1058            "expires_at": snapshot.expires_at.to_rfc3339(),
1059            "user": snapshot.user_id.as_ref().map(|id| serde_json::json!({
1060                "id": id,
1061                "handle": snapshot.handle.clone(),
1062                "display_name": snapshot.display_name.clone(),
1063                "avatar_url": snapshot.avatar_url.clone(),
1064            })),
1065        })
1066        .to_string();
1067        redis::cmd("SETEX")
1068            .arg(format!("presence:{}", snapshot.entity))
1069            .arg(ttl)
1070            .arg(payload)
1071            .query_async::<_, ()>(&mut *conn)
1072            .await
1073            .map_err(|_| StorageError::Redis)?;
1074        Ok(())
1075    }
1076
1077    /// Reads presence state from Redis.
1078    pub async fn read_presence(
1079        &self,
1080        entity: &str,
1081    ) -> Result<Option<PresenceSnapshot>, StorageError> {
1082        let mut conn = self.redis.lock().await;
1083        let value: Option<String> = redis::cmd("GET")
1084            .arg(format!("presence:{}", entity))
1085            .query_async::<_, Option<String>>(&mut *conn)
1086            .await
1087            .map_err(|_| StorageError::Redis)?;
1088        if let Some(json) = value {
1089            let parsed: Value =
1090                serde_json::from_str(&json).map_err(|_| StorageError::Serialization)?;
1091            let state = parsed
1092                .get("state")
1093                .and_then(|v| v.as_str())
1094                .unwrap_or("online")
1095                .to_string();
1096            let expires = parsed
1097                .get("expires_at")
1098                .and_then(|v| v.as_str())
1099                .ok_or(StorageError::Serialization)?;
1100            let expires = DateTime::parse_from_rfc3339(expires)
1101                .map_err(|_| StorageError::Serialization)?
1102                .with_timezone(&Utc);
1103            let user_obj = parsed.get("user").and_then(|v| v.as_object());
1104            let user_id = user_obj
1105                .and_then(|map| map.get("id"))
1106                .and_then(|v| v.as_str())
1107                .map(|v| v.to_string());
1108            let handle = user_obj
1109                .and_then(|map| map.get("handle"))
1110                .and_then(|v| v.as_str())
1111                .map(|v| v.to_string());
1112            let display_name = user_obj
1113                .and_then(|map| map.get("display_name"))
1114                .and_then(|v| v.as_str())
1115                .map(|v| v.to_string());
1116            let avatar_url = user_obj
1117                .and_then(|map| map.get("avatar_url"))
1118                .and_then(|v| v.as_str())
1119                .map(|v| v.to_string());
1120            Ok(Some(PresenceSnapshot {
1121                entity: entity.to_string(),
1122                state,
1123                expires_at: expires,
1124                user_id,
1125                handle,
1126                display_name,
1127                avatar_url,
1128            }))
1129        } else {
1130            Ok(None)
1131        }
1132    }
1133
1134    /// Registers a routing entry in Redis for direct message delivery.
1135    pub async fn register_route(
1136        &self,
1137        entity: &str,
1138        session_id: &str,
1139        ttl_seconds: i64,
1140    ) -> Result<(), StorageError> {
1141        let mut conn = self.redis.lock().await;
1142        redis::cmd("SETEX")
1143            .arg(format!("route:{}", entity))
1144            .arg(ttl_seconds.max(1) as usize)
1145            .arg(session_id)
1146            .query_async::<_, ()>(&mut *conn)
1147            .await
1148            .map_err(|_| StorageError::Redis)?;
1149        Ok(())
1150    }
1151
1152    /// Removes a routing entry from Redis.
1153    pub async fn clear_route(&self, entity: &str) -> Result<(), StorageError> {
1154        let mut conn = self.redis.lock().await;
1155        let _: () = redis::cmd("DEL")
1156            .arg(format!("route:{}", entity))
1157            .query_async::<_, ()>(&mut *conn)
1158            .await
1159            .map_err(|_| StorageError::Redis)?;
1160        Ok(())
1161    }
1162}
1163
1164fn generate_pair_code() -> String {
1165    let mut seed = [0u8; PAIRING_CODE_LENGTH];
1166    OsRng.fill_bytes(&mut seed);
1167    let mut output = String::with_capacity(PAIRING_CODE_LENGTH + 1);
1168    for (index, byte) in seed.iter().enumerate() {
1169        let symbol = PAIRING_ALPHABET[(*byte as usize) % PAIRING_ALPHABET.len()] as char;
1170        output.push(symbol);
1171        if index == (PAIRING_CODE_LENGTH / 2) - 1 {
1172            output.push('-');
1173        }
1174    }
1175    if output.ends_with('-') {
1176        output.pop();
1177    }
1178    output
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184    use chrono::Utc;
1185    use std::str::FromStr;
1186
1187    #[test]
1188    fn init_sql_exists() {
1189        assert!(INIT_SQL.contains("CREATE TABLE"));
1190    }
1191
1192    #[test]
1193    fn pairing_code_format() {
1194        let code = generate_pair_code();
1195        assert_eq!(code.len(), PAIRING_CODE_LENGTH + 1);
1196        assert!(code.contains('-'));
1197    }
1198
1199    #[test]
1200    fn init_sql_declares_new_relations() {
1201        assert!(INIT_SQL.contains("device_key_event"));
1202        assert!(INIT_SQL.contains("chat_group"));
1203        assert!(INIT_SQL.contains("group_member"));
1204        assert!(INIT_SQL.contains("federation_peer"));
1205        assert!(INIT_SQL.contains("inbox_offset"));
1206    }
1207
1208    #[test]
1209    fn pairing_sql_declares_pairing_table() {
1210        assert!(PAIRING_SQL.contains("device_pairing"));
1211    }
1212
1213    #[test]
1214    fn group_role_roundtrip() {
1215        assert_eq!(GroupRole::Owner.as_str(), "owner");
1216        assert_eq!(GroupRole::from_str("admin").unwrap(), GroupRole::Admin);
1217        assert!(GroupRole::from_str("unknown").is_err());
1218    }
1219
1220    #[test]
1221    fn federation_status_roundtrip() {
1222        assert_eq!(FederationPeerStatus::Active.as_str(), "active");
1223        assert_eq!(
1224            FederationPeerStatus::from_str("pending").unwrap(),
1225            FederationPeerStatus::Pending
1226        );
1227        assert!(FederationPeerStatus::from_str("offline").is_err());
1228    }
1229
1230    #[tokio::test]
1231    async fn storage_integration_flow() -> Result<(), Box<dyn std::error::Error>> {
1232        let pg = match std::env::var("COMMUCAT_TEST_PG_DSN") {
1233            Ok(value) => value,
1234            Err(_) => {
1235                eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_PG_DSN not set");
1236                return Ok(());
1237            }
1238        };
1239        let redis = match std::env::var("COMMUCAT_TEST_REDIS_URL") {
1240            Ok(value) => value,
1241            Err(_) => {
1242                eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_REDIS_URL not set");
1243                return Ok(());
1244            }
1245        };
1246        let storage = connect(&pg, &redis).await?;
1247        storage.migrate().await?;
1248        let suffix = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1249        let user_profile = NewUserProfile {
1250            user_id: format!("test-user-{}", suffix),
1251            handle: format!("tester{}", suffix),
1252            display_name: Some("Tester".to_string()),
1253            avatar_url: None,
1254        };
1255        let created = storage.create_user(&user_profile).await?;
1256        let device_id = format!("test-device-{}", suffix);
1257        let device_record = DeviceRecord {
1258            device_id: device_id.clone(),
1259            user_id: created.user_id.clone(),
1260            public_key: vec![1; 32],
1261            status: "active".to_string(),
1262            created_at: Utc::now(),
1263        };
1264        storage.upsert_device(&device_record).await?;
1265        let key_event = DeviceKeyEvent {
1266            event_id: format!("evt-{}", suffix),
1267            device_id: device_id.clone(),
1268            public_key: device_record.public_key.clone(),
1269            recorded_at: Utc::now(),
1270        };
1271        storage.record_device_key_event(&key_event).await?;
1272        let latest = storage
1273            .latest_device_key_event(&device_id)
1274            .await?
1275            .expect("expected key event");
1276        assert_eq!(latest.public_key.len(), 32);
1277
1278        let group = ChatGroup {
1279            group_id: format!("group-{}", suffix),
1280            owner_device: device_id.clone(),
1281            created_at: Utc::now(),
1282        };
1283        storage.create_group(&group).await?;
1284        let member = GroupMember {
1285            group_id: group.group_id.clone(),
1286            device_id: format!("peer-device-{}", suffix),
1287            role: GroupRole::Member,
1288            joined_at: Utc::now(),
1289        };
1290        storage.add_group_member(&member).await?;
1291        let members = storage.list_group_members(&group.group_id).await?;
1292        assert!(members.iter().any(|m| m.device_id == device_id));
1293        assert!(members.iter().any(|m| m.device_id == member.device_id));
1294        let memberships = storage.list_groups_for_device(&member.device_id).await?;
1295        assert_eq!(memberships.len(), 1);
1296        storage
1297            .remove_group_member(&group.group_id, &member.device_id)
1298            .await?;
1299
1300        let peer = FederationPeerRecord {
1301            domain: format!("peer{}.example", suffix),
1302            endpoint: "https://peer.example/federation".to_string(),
1303            public_key: [5u8; 32],
1304            status: FederationPeerStatus::Active,
1305            updated_at: Utc::now(),
1306        };
1307        storage.upsert_federation_peer(&peer).await?;
1308        let fetched = storage.load_federation_peer(&peer.domain).await?;
1309        assert_eq!(fetched.endpoint, peer.endpoint);
1310
1311        let offset = InboxOffset {
1312            entity_id: device_id.clone(),
1313            channel_id: format!("inbox:{}", device_id),
1314            last_envelope_id: Some(format!("env-{}", suffix)),
1315            updated_at: Utc::now(),
1316        };
1317        storage.store_inbox_offset(&offset).await?;
1318        let loaded = storage
1319            .read_inbox_offset(&offset.entity_id, &offset.channel_id)
1320            .await?
1321            .expect("offset present");
1322        assert_eq!(loaded.last_envelope_id, offset.last_envelope_id);
1323
1324        let ticket = storage
1325            .create_pairing_token(&created.user_id, &device_id, 300)
1326            .await?;
1327        assert_eq!(ticket.pair_code.len(), 9);
1328        let paired_device = format!("paired-device-{}", suffix);
1329        let claim = storage
1330            .claim_pairing_token(&ticket.pair_code, &paired_device, &[7u8; 32])
1331            .await?;
1332        assert_eq!(claim.user.user_id, created.user_id);
1333        assert_eq!(claim.issuer_device_id, device_id);
1334        storage
1335            .client
1336            .execute(
1337                "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at, attempts) VALUES ($1, $2, $3, now(), now() - interval '10 minutes', 0)",
1338                &[&format!("expired-{}", suffix), &created.user_id, &device_id],
1339            )
1340            .await
1341            .map_err(|_| StorageError::Postgres)?;
1342        let purged = storage.invalidate_expired_pairings().await?;
1343        assert!(purged >= 1);
1344        Ok(())
1345    }
1346}