commucat_storage/
lib.rs

1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use std::convert::TryInto;
4use std::error::Error;
5use std::fmt::{Display, Formatter};
6use std::str::FromStr;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10use tokio_postgres::{Client, NoTls};
11
12const INIT_SQL: &str = include_str!("../migrations/001_init.sql");
13
14#[derive(Debug)]
15pub enum StorageError {
16    Postgres,
17    Redis,
18    Serialization,
19    Missing,
20}
21
22impl Display for StorageError {
23    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
24        match self {
25            Self::Postgres => write!(f, "postgres failure"),
26            Self::Redis => write!(f, "redis failure"),
27            Self::Serialization => write!(f, "serialization failure"),
28            Self::Missing => write!(f, "missing record"),
29        }
30    }
31}
32
33impl Error for StorageError {}
34
35pub struct Storage {
36    client: Client,
37    _pg_task: JoinHandle<()>,
38    redis: Arc<Mutex<redis::aio::MultiplexedConnection>>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct NewUserProfile {
43    pub user_id: String,
44    pub handle: String,
45    pub display_name: Option<String>,
46    pub avatar_url: Option<String>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct UserProfile {
51    pub user_id: String,
52    pub handle: String,
53    pub display_name: Option<String>,
54    pub avatar_url: Option<String>,
55    pub created_at: DateTime<Utc>,
56    pub updated_at: DateTime<Utc>,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct DeviceRecord {
61    pub device_id: String,
62    pub user_id: String,
63    pub public_key: Vec<u8>,
64    pub status: String,
65    pub created_at: DateTime<Utc>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct SessionRecord {
70    pub session_id: String,
71    pub user_id: String,
72    pub device_id: String,
73    pub tls_fingerprint: String,
74    pub created_at: DateTime<Utc>,
75    pub ttl_seconds: i64,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct RelayEnvelope {
80    pub envelope_id: String,
81    pub channel_id: String,
82    pub payload: Vec<u8>,
83    pub deliver_after: DateTime<Utc>,
84    pub expires_at: DateTime<Utc>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct IdempotencyKey {
89    pub key: String,
90    pub scope: String,
91    pub created_at: DateTime<Utc>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct PresenceSnapshot {
96    pub entity: String,
97    pub state: String,
98    pub expires_at: DateTime<Utc>,
99    pub user_id: Option<String>,
100    pub handle: Option<String>,
101    pub display_name: Option<String>,
102    pub avatar_url: Option<String>,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct DeviceKeyEvent {
107    pub event_id: String,
108    pub device_id: String,
109    pub public_key: Vec<u8>,
110    pub recorded_at: DateTime<Utc>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ChatGroup {
115    pub group_id: String,
116    pub owner_device: String,
117    pub created_at: DateTime<Utc>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum GroupRole {
122    Owner,
123    Admin,
124    Member,
125}
126
127impl GroupRole {
128    fn as_str(&self) -> &'static str {
129        match self {
130            GroupRole::Owner => "owner",
131            GroupRole::Admin => "admin",
132            GroupRole::Member => "member",
133        }
134    }
135}
136
137impl FromStr for GroupRole {
138    type Err = StorageError;
139
140    fn from_str(value: &str) -> Result<Self, Self::Err> {
141        match value {
142            "owner" => Ok(GroupRole::Owner),
143            "admin" => Ok(GroupRole::Admin),
144            "member" => Ok(GroupRole::Member),
145            _ => Err(StorageError::Serialization),
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct GroupMember {
152    pub group_id: String,
153    pub device_id: String,
154    pub role: GroupRole,
155    pub joined_at: DateTime<Utc>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub enum FederationPeerStatus {
160    Active,
161    Pending,
162    Blocked,
163}
164
165impl FederationPeerStatus {
166    fn as_str(&self) -> &'static str {
167        match self {
168            FederationPeerStatus::Active => "active",
169            FederationPeerStatus::Pending => "pending",
170            FederationPeerStatus::Blocked => "blocked",
171        }
172    }
173}
174
175impl FromStr for FederationPeerStatus {
176    type Err = StorageError;
177
178    fn from_str(value: &str) -> Result<Self, Self::Err> {
179        match value {
180            "active" => Ok(FederationPeerStatus::Active),
181            "pending" => Ok(FederationPeerStatus::Pending),
182            "blocked" => Ok(FederationPeerStatus::Blocked),
183            _ => Err(StorageError::Serialization),
184        }
185    }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub struct FederationPeerRecord {
190    pub domain: String,
191    pub endpoint: String,
192    pub public_key: [u8; 32],
193    pub status: FederationPeerStatus,
194    pub updated_at: DateTime<Utc>,
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct InboxOffset {
199    pub entity_id: String,
200    pub channel_id: String,
201    pub last_envelope_id: Option<String>,
202    pub updated_at: DateTime<Utc>,
203}
204
205/// Establishes connectivity to PostgreSQL and Redis backends.
206pub async fn connect(postgres_dsn: &str, redis_url: &str) -> Result<Storage, StorageError> {
207    let (client, connection) = tokio_postgres::connect(postgres_dsn, NoTls)
208        .await
209        .map_err(|_| StorageError::Postgres)?;
210    let task = tokio::spawn(async move {
211        if let Err(error) = connection.await {
212            tracing::error!("postgres connection stopped: {}", error);
213        }
214    });
215    let redis_client = redis::Client::open(redis_url).map_err(|_| StorageError::Redis)?;
216    let redis_connection = redis_client
217        .get_multiplexed_async_connection()
218        .await
219        .map_err(|_| StorageError::Redis)?;
220    Ok(Storage {
221        client,
222        _pg_task: task,
223        redis: Arc::new(Mutex::new(redis_connection)),
224    })
225}
226
227impl Storage {
228    /// Applies bundled migrations to PostgreSQL.
229    pub async fn migrate(&self) -> Result<(), StorageError> {
230        self.client
231            .batch_execute(INIT_SQL)
232            .await
233            .map_err(|_| StorageError::Postgres)
234    }
235
236    /// Executes lightweight probes across PostgreSQL and Redis.
237    pub async fn readiness(&self) -> Result<(), StorageError> {
238        self.client
239            .simple_query("SELECT 1")
240            .await
241            .map_err(|_| StorageError::Postgres)?;
242        let mut conn = self.redis.lock().await;
243        let _: String = redis::cmd("PING")
244            .query_async::<_, String>(&mut *conn)
245            .await
246            .map_err(|_| StorageError::Redis)?;
247        Ok(())
248    }
249
250    /// Registers or rotates a device key.
251    pub async fn upsert_device(&self, record: &DeviceRecord) -> Result<(), StorageError> {
252        let query = "INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at) VALUES ($1, $2, $3, $4, $5)
253            ON CONFLICT (opaque_id) DO UPDATE SET pubkey = excluded.pubkey, status = excluded.status
254            WHERE user_device.user_id = excluded.user_id";
255        self.client
256            .execute(
257                query,
258                &[
259                    &record.device_id,
260                    &record.user_id,
261                    &record.public_key,
262                    &record.status,
263                    &record.created_at,
264                ],
265            )
266            .await
267            .map_err(|_| StorageError::Postgres)?;
268        Ok(())
269    }
270
271    /// Persists an audit trail entry for device key material.
272    pub async fn record_device_key_event(
273        &self,
274        event: &DeviceKeyEvent,
275    ) -> Result<(), StorageError> {
276        let query = "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)";
277        self.client
278            .execute(
279                query,
280                &[
281                    &event.event_id,
282                    &event.device_id,
283                    &event.public_key,
284                    &event.recorded_at,
285                ],
286            )
287            .await
288            .map_err(|_| StorageError::Postgres)?;
289        Ok(())
290    }
291
292    /// Fetches the newest device key event for a device identifier.
293    pub async fn latest_device_key_event(
294        &self,
295        device_id: &str,
296    ) -> Result<Option<DeviceKeyEvent>, StorageError> {
297        let query = "SELECT event_id, device_id, public_key, recorded_at
298            FROM device_key_event WHERE device_id = $1 ORDER BY recorded_at DESC LIMIT 1";
299        let row = self
300            .client
301            .query_opt(query, &[&device_id])
302            .await
303            .map_err(|_| StorageError::Postgres)?;
304        Ok(row.map(|row| DeviceKeyEvent {
305            event_id: row.get(0),
306            device_id: row.get(1),
307            public_key: row.get(2),
308            recorded_at: row.get(3),
309        }))
310    }
311
312    /// Creates a session binding a device to a TLS fingerprint.
313    pub async fn record_session(&self, session: &SessionRecord) -> Result<(), StorageError> {
314        let query =
315            "INSERT INTO session (opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds)
316            VALUES ($1, $2, $3, $4, $5, $6)";
317        self.client
318            .execute(
319                query,
320                &[
321                    &session.session_id,
322                    &session.user_id,
323                    &session.device_id,
324                    &session.tls_fingerprint,
325                    &session.created_at,
326                    &session.ttl_seconds,
327                ],
328            )
329            .await
330            .map_err(|_| StorageError::Postgres)?;
331        Ok(())
332    }
333
334    /// Fetches device metadata by identifier.
335    pub async fn load_device(&self, device_id: &str) -> Result<DeviceRecord, StorageError> {
336        let row = self
337            .client
338            .query_opt(
339                "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE opaque_id = $1",
340                &[&device_id],
341            )
342            .await
343            .map_err(|_| StorageError::Postgres)?;
344        let row = row.ok_or(StorageError::Missing)?;
345        Ok(DeviceRecord {
346            device_id: row.get(0),
347            user_id: row.get(1),
348            public_key: row.get(2),
349            status: row.get(3),
350            created_at: row.get(4),
351        })
352    }
353
354    /// Creates a new user profile entry.
355    pub async fn create_user(&self, profile: &NewUserProfile) -> Result<UserProfile, StorageError> {
356        let now = Utc::now();
357        let row = self
358            .client
359            .query_one(
360                "INSERT INTO app_user (user_id, handle, display_name, avatar_url, created_at, updated_at)
361                VALUES ($1, $2, $3, $4, $5, $5)
362                RETURNING user_id, handle, display_name, avatar_url, created_at, updated_at",
363                &[
364                    &profile.user_id,
365                    &profile.handle,
366                    &profile.display_name,
367                    &profile.avatar_url,
368                    &now,
369                ],
370            )
371            .await
372            .map_err(|_| StorageError::Postgres)?;
373        Ok(UserProfile {
374            user_id: row.get(0),
375            handle: row.get(1),
376            display_name: row.get(2),
377            avatar_url: row.get(3),
378            created_at: row.get(4),
379            updated_at: row.get(5),
380        })
381    }
382
383    /// Loads a user profile by identifier.
384    pub async fn load_user(&self, user_id: &str) -> Result<UserProfile, StorageError> {
385        let row = self
386            .client
387            .query_opt(
388                "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE user_id = $1",
389                &[&user_id],
390            )
391            .await
392            .map_err(|_| StorageError::Postgres)?;
393        let row = row.ok_or(StorageError::Missing)?;
394        Ok(UserProfile {
395            user_id: row.get(0),
396            handle: row.get(1),
397            display_name: row.get(2),
398            avatar_url: row.get(3),
399            created_at: row.get(4),
400            updated_at: row.get(5),
401        })
402    }
403
404    /// Loads a user profile by handle.
405    pub async fn load_user_by_handle(&self, handle: &str) -> Result<UserProfile, StorageError> {
406        let row = self
407            .client
408            .query_opt(
409                "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE handle = $1",
410                &[&handle],
411            )
412            .await
413            .map_err(|_| StorageError::Postgres)?;
414        let row = row.ok_or(StorageError::Missing)?;
415        Ok(UserProfile {
416            user_id: row.get(0),
417            handle: row.get(1),
418            display_name: row.get(2),
419            avatar_url: row.get(3),
420            created_at: row.get(4),
421            updated_at: row.get(5),
422        })
423    }
424
425    /// Applies partial updates to user profile metadata.
426    pub async fn update_user_profile(
427        &self,
428        user_id: &str,
429        display_name: Option<&str>,
430        avatar_url: Option<&str>,
431    ) -> Result<(), StorageError> {
432        let now = Utc::now();
433        let affected = self
434            .client
435            .execute(
436                "UPDATE app_user SET display_name = COALESCE($2, display_name), avatar_url = COALESCE($3, avatar_url), updated_at = $4 WHERE user_id = $1",
437                &[&user_id, &display_name, &avatar_url, &now],
438            )
439            .await
440            .map_err(|_| StorageError::Postgres)?;
441        if affected == 0 {
442            return Err(StorageError::Missing);
443        }
444        Ok(())
445    }
446
447    /// Creates a chat group entry and enrolls the owner as a member.
448    pub async fn create_group(&self, group: &ChatGroup) -> Result<(), StorageError> {
449        self.client
450            .execute(
451                "INSERT INTO chat_group (group_id, owner_device, created_at) VALUES ($1, $2, $3)
452                ON CONFLICT (group_id) DO NOTHING",
453                &[&group.group_id, &group.owner_device, &group.created_at],
454            )
455            .await
456            .map_err(|_| StorageError::Postgres)?;
457        self.client
458            .execute(
459                "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
460                ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role",
461                &[
462                    &group.group_id,
463                    &group.owner_device,
464                    &GroupRole::Owner.as_str(),
465                    &group.created_at,
466                ],
467            )
468            .await
469            .map_err(|_| StorageError::Postgres)?;
470        Ok(())
471    }
472
473    /// Adds or updates group membership information.
474    pub async fn add_group_member(&self, member: &GroupMember) -> Result<(), StorageError> {
475        let query = "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
476            ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role, joined_at = excluded.joined_at";
477        self.client
478            .execute(
479                query,
480                &[
481                    &member.group_id,
482                    &member.device_id,
483                    &member.role.as_str(),
484                    &member.joined_at,
485                ],
486            )
487            .await
488            .map_err(|_| StorageError::Postgres)?;
489        Ok(())
490    }
491
492    /// Removes a member from the given group.
493    pub async fn remove_group_member(
494        &self,
495        group_id: &str,
496        device_id: &str,
497    ) -> Result<(), StorageError> {
498        let affected = self
499            .client
500            .execute(
501                "DELETE FROM group_member WHERE group_id = $1 AND device_id = $2",
502                &[&group_id, &device_id],
503            )
504            .await
505            .map_err(|_| StorageError::Postgres)?;
506        if affected == 0 {
507            return Err(StorageError::Missing);
508        }
509        Ok(())
510    }
511
512    /// Lists all members of a group ordered by join time.
513    pub async fn list_group_members(
514        &self,
515        group_id: &str,
516    ) -> Result<Vec<GroupMember>, StorageError> {
517        let rows = self
518            .client
519            .query(
520                "SELECT group_id, device_id, role, joined_at FROM group_member WHERE group_id = $1 ORDER BY joined_at ASC",
521                &[&group_id],
522            )
523            .await
524            .map_err(|_| StorageError::Postgres)?;
525        let mut members = Vec::with_capacity(rows.len());
526        for row in rows {
527            let role: String = row.get(2);
528            let parsed = GroupRole::from_str(role.as_str())?;
529            members.push(GroupMember {
530                group_id: row.get(0),
531                device_id: row.get(1),
532                role: parsed,
533                joined_at: row.get(3),
534            });
535        }
536        Ok(members)
537    }
538
539    /// Loads group metadata by identifier.
540    pub async fn load_group(&self, group_id: &str) -> Result<ChatGroup, StorageError> {
541        let row = self
542            .client
543            .query_opt(
544                "SELECT group_id, owner_device, created_at FROM chat_group WHERE group_id = $1",
545                &[&group_id],
546            )
547            .await
548            .map_err(|_| StorageError::Postgres)?;
549        let row = row.ok_or(StorageError::Missing)?;
550        Ok(ChatGroup {
551            group_id: row.get(0),
552            owner_device: row.get(1),
553            created_at: row.get(2),
554        })
555    }
556
557    /// Lists groups that include the target device.
558    pub async fn list_groups_for_device(
559        &self,
560        device_id: &str,
561    ) -> Result<Vec<ChatGroup>, StorageError> {
562        let rows = self
563            .client
564            .query(
565                "SELECT g.group_id, g.owner_device, g.created_at FROM chat_group g
566                INNER JOIN group_member m ON g.group_id = m.group_id
567                WHERE m.device_id = $1 ORDER BY g.created_at ASC",
568                &[&device_id],
569            )
570            .await
571            .map_err(|_| StorageError::Postgres)?;
572        Ok(rows
573            .into_iter()
574            .map(|row| ChatGroup {
575                group_id: row.get(0),
576                owner_device: row.get(1),
577                created_at: row.get(2),
578            })
579            .collect())
580    }
581
582    /// Upserts federation peer descriptors for S2S routing.
583    pub async fn upsert_federation_peer(
584        &self,
585        peer: &FederationPeerRecord,
586    ) -> Result<(), StorageError> {
587        let query = "INSERT INTO federation_peer (domain, endpoint, public_key, status, updated_at)
588            VALUES ($1, $2, $3, $4, $5)
589            ON CONFLICT (domain) DO UPDATE SET endpoint = excluded.endpoint, public_key = excluded.public_key, status = excluded.status, updated_at = excluded.updated_at";
590        self.client
591            .execute(
592                query,
593                &[
594                    &peer.domain,
595                    &peer.endpoint,
596                    &peer.public_key.as_slice(),
597                    &peer.status.as_str(),
598                    &peer.updated_at,
599                ],
600            )
601            .await
602            .map_err(|_| StorageError::Postgres)?;
603        Ok(())
604    }
605
606    /// Loads a federation peer by domain.
607    pub async fn load_federation_peer(
608        &self,
609        domain: &str,
610    ) -> Result<FederationPeerRecord, StorageError> {
611        let row = self
612            .client
613            .query_opt(
614                "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer WHERE domain = $1",
615                &[&domain],
616            )
617            .await
618            .map_err(|_| StorageError::Postgres)?;
619        let row = row.ok_or(StorageError::Missing)?;
620        let key: Vec<u8> = row.get(2);
621        let status: String = row.get(3);
622        let status = FederationPeerStatus::from_str(status.as_str())?;
623        let public_key: [u8; 32] = key
624            .as_slice()
625            .try_into()
626            .map_err(|_| StorageError::Serialization)?;
627        Ok(FederationPeerRecord {
628            domain: row.get(0),
629            endpoint: row.get(1),
630            public_key,
631            status,
632            updated_at: row.get(4),
633        })
634    }
635
636    /// Enumerates all known federation peers.
637    pub async fn list_federation_peers(&self) -> Result<Vec<FederationPeerRecord>, StorageError> {
638        let rows = self
639            .client
640            .query(
641                "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer",
642                &[],
643            )
644            .await
645            .map_err(|_| StorageError::Postgres)?;
646        let mut peers = Vec::with_capacity(rows.len());
647        for row in rows {
648            let key: Vec<u8> = row.get(2);
649            let status: String = row.get(3);
650            let status = FederationPeerStatus::from_str(status.as_str())?;
651            let public_key: [u8; 32] = key
652                .as_slice()
653                .try_into()
654                .map_err(|_| StorageError::Serialization)?;
655            peers.push(FederationPeerRecord {
656                domain: row.get(0),
657                endpoint: row.get(1),
658                public_key,
659                status,
660                updated_at: row.get(4),
661            });
662        }
663        Ok(peers)
664    }
665
666    /// Sets the peer status and refresh timestamp.
667    pub async fn set_federation_peer_status(
668        &self,
669        domain: &str,
670        status: FederationPeerStatus,
671    ) -> Result<(), StorageError> {
672        let now = Utc::now();
673        let affected = self
674            .client
675            .execute(
676                "UPDATE federation_peer SET status = $2, updated_at = $3 WHERE domain = $1",
677                &[&domain, &status.as_str(), &now],
678            )
679            .await
680            .map_err(|_| StorageError::Postgres)?;
681        if affected == 0 {
682            return Err(StorageError::Missing);
683        }
684        Ok(())
685    }
686
687    /// Schedules an encrypted relay envelope for delivery.
688    pub async fn enqueue_relay(&self, envelope: &RelayEnvelope) -> Result<(), StorageError> {
689        let query =
690            "INSERT INTO relay_queue (envelope_id, channel_id, payload, deliver_after, expires_at)
691            VALUES ($1, $2, $3, $4, $5)";
692        self.client
693            .execute(
694                query,
695                &[
696                    &envelope.envelope_id,
697                    &envelope.channel_id,
698                    &envelope.payload,
699                    &envelope.deliver_after,
700                    &envelope.expires_at,
701                ],
702            )
703            .await
704            .map_err(|_| StorageError::Postgres)?;
705        Ok(())
706    }
707
708    /// Claims pending relay envelopes for a channel.
709    pub async fn claim_envelopes(
710        &self,
711        channel_id: &str,
712        limit: i64,
713    ) -> Result<Vec<RelayEnvelope>, StorageError> {
714        let query = "DELETE FROM relay_queue
715            WHERE envelope_id IN (
716                SELECT envelope_id FROM relay_queue
717                WHERE channel_id = $1 AND deliver_after <= now()
718                ORDER BY deliver_after ASC
719                LIMIT $2
720            )
721            RETURNING envelope_id, channel_id, payload, deliver_after, expires_at";
722        let rows = self
723            .client
724            .query(query, &[&channel_id, &limit])
725            .await
726            .map_err(|_| StorageError::Postgres)?;
727        Ok(rows
728            .into_iter()
729            .map(|row| RelayEnvelope {
730                envelope_id: row.get(0),
731                channel_id: row.get(1),
732                payload: row.get(2),
733                deliver_after: row.get(3),
734                expires_at: row.get(4),
735            })
736            .collect())
737    }
738
739    /// Stores the last delivered envelope reference for an entity/channel pair.
740    pub async fn store_inbox_offset(&self, offset: &InboxOffset) -> Result<(), StorageError> {
741        let query = "INSERT INTO inbox_offset (entity_id, channel_id, last_envelope_id, updated_at)
742            VALUES ($1, $2, $3, $4)
743            ON CONFLICT (entity_id, channel_id) DO UPDATE SET last_envelope_id = excluded.last_envelope_id, updated_at = excluded.updated_at";
744        self.client
745            .execute(
746                query,
747                &[
748                    &offset.entity_id,
749                    &offset.channel_id,
750                    &offset.last_envelope_id,
751                    &offset.updated_at,
752                ],
753            )
754            .await
755            .map_err(|_| StorageError::Postgres)?;
756        Ok(())
757    }
758
759    /// Reads the stored inbox offset if present.
760    pub async fn read_inbox_offset(
761        &self,
762        entity_id: &str,
763        channel_id: &str,
764    ) -> Result<Option<InboxOffset>, StorageError> {
765        let row = self
766            .client
767            .query_opt(
768                "SELECT entity_id, channel_id, last_envelope_id, updated_at FROM inbox_offset WHERE entity_id = $1 AND channel_id = $2",
769                &[&entity_id, &channel_id],
770            )
771            .await
772            .map_err(|_| StorageError::Postgres)?;
773        Ok(row.map(|row| InboxOffset {
774            entity_id: row.get(0),
775            channel_id: row.get(1),
776            last_envelope_id: row.get(2),
777            updated_at: row.get(3),
778        }))
779    }
780
781    /// Records an idempotency key for deduplication.
782    pub async fn store_idempotency(&self, key: &IdempotencyKey) -> Result<(), StorageError> {
783        let query = "INSERT INTO idempotency (key, scope, created_at) VALUES ($1, $2, $3)
784            ON CONFLICT (key, scope) DO NOTHING";
785        self.client
786            .execute(query, &[&key.key, &key.scope, &key.created_at])
787            .await
788            .map_err(|_| StorageError::Postgres)?;
789        Ok(())
790    }
791
792    /// Publishes local presence information into Redis.
793    pub async fn publish_presence(&self, snapshot: &PresenceSnapshot) -> Result<(), StorageError> {
794        let mut conn = self.redis.lock().await;
795        let ttl = (snapshot.expires_at.timestamp() - Utc::now().timestamp()).max(1) as usize;
796        let payload = serde_json::json!({
797            "entity": snapshot.entity,
798            "state": snapshot.state,
799            "expires_at": snapshot.expires_at.to_rfc3339(),
800            "user": snapshot.user_id.as_ref().map(|id| serde_json::json!({
801                "id": id,
802                "handle": snapshot.handle.clone(),
803                "display_name": snapshot.display_name.clone(),
804                "avatar_url": snapshot.avatar_url.clone(),
805            })),
806        })
807        .to_string();
808        redis::cmd("SETEX")
809            .arg(format!("presence:{}", snapshot.entity))
810            .arg(ttl)
811            .arg(payload)
812            .query_async::<_, ()>(&mut *conn)
813            .await
814            .map_err(|_| StorageError::Redis)?;
815        Ok(())
816    }
817
818    /// Reads presence state from Redis.
819    pub async fn read_presence(
820        &self,
821        entity: &str,
822    ) -> Result<Option<PresenceSnapshot>, StorageError> {
823        let mut conn = self.redis.lock().await;
824        let value: Option<String> = redis::cmd("GET")
825            .arg(format!("presence:{}", entity))
826            .query_async::<_, Option<String>>(&mut *conn)
827            .await
828            .map_err(|_| StorageError::Redis)?;
829        if let Some(json) = value {
830            let parsed: Value =
831                serde_json::from_str(&json).map_err(|_| StorageError::Serialization)?;
832            let state = parsed
833                .get("state")
834                .and_then(|v| v.as_str())
835                .unwrap_or("online")
836                .to_string();
837            let expires = parsed
838                .get("expires_at")
839                .and_then(|v| v.as_str())
840                .ok_or(StorageError::Serialization)?;
841            let expires = DateTime::parse_from_rfc3339(expires)
842                .map_err(|_| StorageError::Serialization)?
843                .with_timezone(&Utc);
844            let user_obj = parsed.get("user").and_then(|v| v.as_object());
845            let user_id = user_obj
846                .and_then(|map| map.get("id"))
847                .and_then(|v| v.as_str())
848                .map(|v| v.to_string());
849            let handle = user_obj
850                .and_then(|map| map.get("handle"))
851                .and_then(|v| v.as_str())
852                .map(|v| v.to_string());
853            let display_name = user_obj
854                .and_then(|map| map.get("display_name"))
855                .and_then(|v| v.as_str())
856                .map(|v| v.to_string());
857            let avatar_url = user_obj
858                .and_then(|map| map.get("avatar_url"))
859                .and_then(|v| v.as_str())
860                .map(|v| v.to_string());
861            Ok(Some(PresenceSnapshot {
862                entity: entity.to_string(),
863                state,
864                expires_at: expires,
865                user_id,
866                handle,
867                display_name,
868                avatar_url,
869            }))
870        } else {
871            Ok(None)
872        }
873    }
874
875    /// Registers a routing entry in Redis for direct message delivery.
876    pub async fn register_route(
877        &self,
878        entity: &str,
879        session_id: &str,
880        ttl_seconds: i64,
881    ) -> Result<(), StorageError> {
882        let mut conn = self.redis.lock().await;
883        redis::cmd("SETEX")
884            .arg(format!("route:{}", entity))
885            .arg(ttl_seconds.max(1) as usize)
886            .arg(session_id)
887            .query_async::<_, ()>(&mut *conn)
888            .await
889            .map_err(|_| StorageError::Redis)?;
890        Ok(())
891    }
892
893    /// Removes a routing entry from Redis.
894    pub async fn clear_route(&self, entity: &str) -> Result<(), StorageError> {
895        let mut conn = self.redis.lock().await;
896        let _: () = redis::cmd("DEL")
897            .arg(format!("route:{}", entity))
898            .query_async::<_, ()>(&mut *conn)
899            .await
900            .map_err(|_| StorageError::Redis)?;
901        Ok(())
902    }
903}
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908    use chrono::Utc;
909    use std::str::FromStr;
910
911    #[test]
912    fn init_sql_exists() {
913        assert!(INIT_SQL.contains("CREATE TABLE"));
914    }
915
916    #[test]
917    fn init_sql_declares_new_relations() {
918        assert!(INIT_SQL.contains("device_key_event"));
919        assert!(INIT_SQL.contains("chat_group"));
920        assert!(INIT_SQL.contains("group_member"));
921        assert!(INIT_SQL.contains("federation_peer"));
922        assert!(INIT_SQL.contains("inbox_offset"));
923    }
924
925    #[test]
926    fn group_role_roundtrip() {
927        assert_eq!(GroupRole::Owner.as_str(), "owner");
928        assert_eq!(GroupRole::from_str("admin").unwrap(), GroupRole::Admin);
929        assert!(GroupRole::from_str("unknown").is_err());
930    }
931
932    #[test]
933    fn federation_status_roundtrip() {
934        assert_eq!(FederationPeerStatus::Active.as_str(), "active");
935        assert_eq!(
936            FederationPeerStatus::from_str("pending").unwrap(),
937            FederationPeerStatus::Pending
938        );
939        assert!(FederationPeerStatus::from_str("offline").is_err());
940    }
941
942    #[tokio::test]
943    async fn storage_integration_flow() -> Result<(), Box<dyn std::error::Error>> {
944        let pg = match std::env::var("COMMUCAT_TEST_PG_DSN") {
945            Ok(value) => value,
946            Err(_) => {
947                eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_PG_DSN not set");
948                return Ok(());
949            }
950        };
951        let redis = match std::env::var("COMMUCAT_TEST_REDIS_URL") {
952            Ok(value) => value,
953            Err(_) => {
954                eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_REDIS_URL not set");
955                return Ok(());
956            }
957        };
958        let storage = connect(&pg, &redis).await?;
959        storage.migrate().await?;
960        let suffix = Utc::now().timestamp_nanos_opt().unwrap_or_default();
961        let user_profile = NewUserProfile {
962            user_id: format!("test-user-{}", suffix),
963            handle: format!("tester{}", suffix),
964            display_name: Some("Tester".to_string()),
965            avatar_url: None,
966        };
967        let created = storage.create_user(&user_profile).await?;
968        let device_id = format!("test-device-{}", suffix);
969        let device_record = DeviceRecord {
970            device_id: device_id.clone(),
971            user_id: created.user_id.clone(),
972            public_key: vec![1; 32],
973            status: "active".to_string(),
974            created_at: Utc::now(),
975        };
976        storage.upsert_device(&device_record).await?;
977        let key_event = DeviceKeyEvent {
978            event_id: format!("evt-{}", suffix),
979            device_id: device_id.clone(),
980            public_key: device_record.public_key.clone(),
981            recorded_at: Utc::now(),
982        };
983        storage.record_device_key_event(&key_event).await?;
984        let latest = storage
985            .latest_device_key_event(&device_id)
986            .await?
987            .expect("expected key event");
988        assert_eq!(latest.public_key.len(), 32);
989
990        let group = ChatGroup {
991            group_id: format!("group-{}", suffix),
992            owner_device: device_id.clone(),
993            created_at: Utc::now(),
994        };
995        storage.create_group(&group).await?;
996        let member = GroupMember {
997            group_id: group.group_id.clone(),
998            device_id: format!("peer-device-{}", suffix),
999            role: GroupRole::Member,
1000            joined_at: Utc::now(),
1001        };
1002        storage.add_group_member(&member).await?;
1003        let members = storage.list_group_members(&group.group_id).await?;
1004        assert!(members.iter().any(|m| m.device_id == device_id));
1005        assert!(members.iter().any(|m| m.device_id == member.device_id));
1006        let memberships = storage.list_groups_for_device(&member.device_id).await?;
1007        assert_eq!(memberships.len(), 1);
1008        storage
1009            .remove_group_member(&group.group_id, &member.device_id)
1010            .await?;
1011
1012        let peer = FederationPeerRecord {
1013            domain: format!("peer{}.example", suffix),
1014            endpoint: "https://peer.example/federation".to_string(),
1015            public_key: [5u8; 32],
1016            status: FederationPeerStatus::Active,
1017            updated_at: Utc::now(),
1018        };
1019        storage.upsert_federation_peer(&peer).await?;
1020        let fetched = storage.load_federation_peer(&peer.domain).await?;
1021        assert_eq!(fetched.endpoint, peer.endpoint);
1022
1023        let offset = InboxOffset {
1024            entity_id: device_id.clone(),
1025            channel_id: format!("inbox:{}", device_id),
1026            last_envelope_id: Some(format!("env-{}", suffix)),
1027            updated_at: Utc::now(),
1028        };
1029        storage.store_inbox_offset(&offset).await?;
1030        let loaded = storage
1031            .read_inbox_offset(&offset.entity_id, &offset.channel_id)
1032            .await?
1033            .expect("offset present");
1034        assert_eq!(loaded.last_envelope_id, offset.last_envelope_id);
1035        Ok(())
1036    }
1037}