1use chrono::{DateTime, Duration, Utc};
2use rand::{rngs::OsRng, RngCore};
3use serde_json::Value;
4use std::convert::TryInto;
5use std::error::Error;
6use std::fmt::{Display, Formatter};
7use std::str::FromStr;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11use tokio_postgres::{Client, NoTls};
12
13const INIT_SQL: &str = include_str!("../migrations/001_init.sql");
14const PAIRING_SQL: &str = include_str!("../migrations/002_pairing.sql");
15const PAIRING_MAX_ATTEMPTS: i32 = 5;
16const PAIRING_CODE_LENGTH: usize = 8;
17const PAIRING_ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
18
19#[derive(Debug)]
20pub enum StorageError {
21 Postgres,
22 Redis,
23 Serialization,
24 Missing,
25 Invalid,
26}
27
28impl Display for StorageError {
29 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30 match self {
31 Self::Postgres => write!(f, "postgres failure"),
32 Self::Redis => write!(f, "redis failure"),
33 Self::Serialization => write!(f, "serialization failure"),
34 Self::Missing => write!(f, "missing record"),
35 Self::Invalid => write!(f, "invalid state"),
36 }
37 }
38}
39
40impl Error for StorageError {}
41
42pub struct Storage {
43 client: Client,
44 _pg_task: JoinHandle<()>,
45 redis: Arc<Mutex<redis::aio::MultiplexedConnection>>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct NewUserProfile {
50 pub user_id: String,
51 pub handle: String,
52 pub display_name: Option<String>,
53 pub avatar_url: Option<String>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct UserProfile {
58 pub user_id: String,
59 pub handle: String,
60 pub display_name: Option<String>,
61 pub avatar_url: Option<String>,
62 pub created_at: DateTime<Utc>,
63 pub updated_at: DateTime<Utc>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct DeviceRecord {
68 pub device_id: String,
69 pub user_id: String,
70 pub public_key: Vec<u8>,
71 pub status: String,
72 pub created_at: DateTime<Utc>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct SessionRecord {
77 pub session_id: String,
78 pub user_id: String,
79 pub device_id: String,
80 pub tls_fingerprint: String,
81 pub created_at: DateTime<Utc>,
82 pub ttl_seconds: i64,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct RelayEnvelope {
87 pub envelope_id: String,
88 pub channel_id: String,
89 pub payload: Vec<u8>,
90 pub deliver_after: DateTime<Utc>,
91 pub expires_at: DateTime<Utc>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct IdempotencyKey {
96 pub key: String,
97 pub scope: String,
98 pub created_at: DateTime<Utc>,
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct PresenceSnapshot {
103 pub entity: String,
104 pub state: String,
105 pub expires_at: DateTime<Utc>,
106 pub user_id: Option<String>,
107 pub handle: Option<String>,
108 pub display_name: Option<String>,
109 pub avatar_url: Option<String>,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub struct DeviceKeyEvent {
114 pub event_id: String,
115 pub device_id: String,
116 pub public_key: Vec<u8>,
117 pub recorded_at: DateTime<Utc>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct PairingTokenIssued {
122 pub pair_code: String,
123 pub issued_at: DateTime<Utc>,
124 pub expires_at: DateTime<Utc>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct PairingClaimResult {
129 pub user: UserProfile,
130 pub issuer_device_id: String,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct ChatGroup {
135 pub group_id: String,
136 pub owner_device: String,
137 pub created_at: DateTime<Utc>,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub enum GroupRole {
142 Owner,
143 Admin,
144 Member,
145}
146
147impl GroupRole {
148 fn as_str(&self) -> &'static str {
149 match self {
150 GroupRole::Owner => "owner",
151 GroupRole::Admin => "admin",
152 GroupRole::Member => "member",
153 }
154 }
155}
156
157impl FromStr for GroupRole {
158 type Err = StorageError;
159
160 fn from_str(value: &str) -> Result<Self, Self::Err> {
161 match value {
162 "owner" => Ok(GroupRole::Owner),
163 "admin" => Ok(GroupRole::Admin),
164 "member" => Ok(GroupRole::Member),
165 _ => Err(StorageError::Serialization),
166 }
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct GroupMember {
172 pub group_id: String,
173 pub device_id: String,
174 pub role: GroupRole,
175 pub joined_at: DateTime<Utc>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum FederationPeerStatus {
180 Active,
181 Pending,
182 Blocked,
183}
184
185impl FederationPeerStatus {
186 fn as_str(&self) -> &'static str {
187 match self {
188 FederationPeerStatus::Active => "active",
189 FederationPeerStatus::Pending => "pending",
190 FederationPeerStatus::Blocked => "blocked",
191 }
192 }
193}
194
195impl FromStr for FederationPeerStatus {
196 type Err = StorageError;
197
198 fn from_str(value: &str) -> Result<Self, Self::Err> {
199 match value {
200 "active" => Ok(FederationPeerStatus::Active),
201 "pending" => Ok(FederationPeerStatus::Pending),
202 "blocked" => Ok(FederationPeerStatus::Blocked),
203 _ => Err(StorageError::Serialization),
204 }
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct FederationPeerRecord {
210 pub domain: String,
211 pub endpoint: String,
212 pub public_key: [u8; 32],
213 pub status: FederationPeerStatus,
214 pub updated_at: DateTime<Utc>,
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub struct InboxOffset {
219 pub entity_id: String,
220 pub channel_id: String,
221 pub last_envelope_id: Option<String>,
222 pub updated_at: DateTime<Utc>,
223}
224
225pub async fn connect(postgres_dsn: &str, redis_url: &str) -> Result<Storage, StorageError> {
227 let (client, connection) = tokio_postgres::connect(postgres_dsn, NoTls)
228 .await
229 .map_err(|_| StorageError::Postgres)?;
230 let task = tokio::spawn(async move {
231 if let Err(error) = connection.await {
232 tracing::error!("postgres connection stopped: {}", error);
233 }
234 });
235 let redis_client = redis::Client::open(redis_url).map_err(|_| StorageError::Redis)?;
236 let redis_connection = redis_client
237 .get_multiplexed_async_connection()
238 .await
239 .map_err(|_| StorageError::Redis)?;
240 Ok(Storage {
241 client,
242 _pg_task: task,
243 redis: Arc::new(Mutex::new(redis_connection)),
244 })
245}
246
247impl Storage {
248 pub async fn migrate(&self) -> Result<(), StorageError> {
250 self.client
251 .batch_execute(INIT_SQL)
252 .await
253 .map_err(|_| StorageError::Postgres)?;
254 self.client
255 .batch_execute(PAIRING_SQL)
256 .await
257 .map_err(|_| StorageError::Postgres)
258 }
259
260 pub async fn readiness(&self) -> Result<(), StorageError> {
262 self.client
263 .simple_query("SELECT 1")
264 .await
265 .map_err(|_| StorageError::Postgres)?;
266 let mut conn = self.redis.lock().await;
267 let _: String = redis::cmd("PING")
268 .query_async::<_, String>(&mut *conn)
269 .await
270 .map_err(|_| StorageError::Redis)?;
271 Ok(())
272 }
273
274 pub async fn create_pairing_token(
276 &self,
277 user_id: &str,
278 issuer_device_id: &str,
279 ttl_seconds: i64,
280 ) -> Result<PairingTokenIssued, StorageError> {
281 let issuer = self.load_device(issuer_device_id).await?;
282 if issuer.user_id != user_id || issuer.status != "active" {
283 return Err(StorageError::Invalid);
284 }
285 let ttl = ttl_seconds.clamp(60, 3600);
286 let issued_at = Utc::now();
287 let expires_at = issued_at + Duration::seconds(ttl);
288 for _ in 0..16 {
289 let pair_code = generate_pair_code();
290 let inserted = self
291 .client
292 .execute(
293 "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at)
294 VALUES ($1, $2, $3, $4, $5)
295 ON CONFLICT (pair_code) DO NOTHING",
296 &[&pair_code, &user_id, &issuer_device_id, &issued_at, &expires_at],
297 )
298 .await
299 .map_err(|_| StorageError::Postgres)?;
300 if inserted == 1 {
301 return Ok(PairingTokenIssued {
302 pair_code,
303 issued_at,
304 expires_at,
305 });
306 }
307 }
308 Err(StorageError::Postgres)
309 }
310
311 pub async fn upsert_device(&self, record: &DeviceRecord) -> Result<(), StorageError> {
313 let query = "INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at) VALUES ($1, $2, $3, $4, $5)
314 ON CONFLICT (opaque_id) DO UPDATE SET pubkey = excluded.pubkey, status = excluded.status
315 WHERE user_device.user_id = excluded.user_id";
316 self.client
317 .execute(
318 query,
319 &[
320 &record.device_id,
321 &record.user_id,
322 &record.public_key,
323 &record.status,
324 &record.created_at,
325 ],
326 )
327 .await
328 .map_err(|_| StorageError::Postgres)?;
329 Ok(())
330 }
331
332 pub async fn record_device_key_event(
334 &self,
335 event: &DeviceKeyEvent,
336 ) -> Result<(), StorageError> {
337 let query = "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)";
338 self.client
339 .execute(
340 query,
341 &[
342 &event.event_id,
343 &event.device_id,
344 &event.public_key,
345 &event.recorded_at,
346 ],
347 )
348 .await
349 .map_err(|_| StorageError::Postgres)?;
350 Ok(())
351 }
352
353 pub async fn claim_pairing_token(
355 &self,
356 pair_code: &str,
357 device_id: &str,
358 public_key: &[u8],
359 ) -> Result<PairingClaimResult, StorageError> {
360 let recorded_at = Utc::now();
361 let event_id = format!(
362 "pair:{}:{}",
363 device_id,
364 recorded_at.timestamp_nanos_opt().unwrap_or_default()
365 );
366 let stmt = "WITH selected AS (
367 SELECT user_id, issuer_device_id, expires_at, redeemed_at, attempts
368 FROM device_pairing
369 WHERE pair_code = $1
370 FOR UPDATE
371 ),
372 validated AS (
373 SELECT user_id, issuer_device_id
374 FROM selected
375 WHERE expires_at > now()
376 AND redeemed_at IS NULL
377 AND attempts < $6
378 ),
379 updated AS (
380 UPDATE device_pairing
381 SET redeemed_at = $5,
382 redeemed_device_id = $2,
383 public_key = $3,
384 attempts = LEAST(attempts + 1, $6)
385 WHERE pair_code = $1
386 AND EXISTS (SELECT 1 FROM validated)
387 RETURNING user_id, issuer_device_id
388 ),
389 inserted AS (
390 INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at)
391 SELECT $2, user_id, $3, 'active', $5 FROM validated
392 RETURNING user_id
393 ),
394 events AS (
395 INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at)
396 SELECT $4, $2, $3, $5 FROM inserted
397 )
398 SELECT user_id, issuer_device_id FROM updated";
399 let result = self
400 .client
401 .query_opt(
402 stmt,
403 &[
404 &pair_code,
405 &device_id,
406 &public_key,
407 &event_id,
408 &recorded_at,
409 &PAIRING_MAX_ATTEMPTS,
410 ],
411 )
412 .await
413 .map_err(|_| StorageError::Postgres)?;
414 let (user_id, issuer_device_id) = match result {
415 Some(row) => {
416 let user_id: String = row.get(0);
417 let issuer_device_id: String = row.get(1);
418 (user_id, issuer_device_id)
419 }
420 None => {
421 let exists = self
422 .client
423 .query_opt(
424 "SELECT 1 FROM device_pairing WHERE pair_code = $1",
425 &[&pair_code],
426 )
427 .await
428 .map_err(|_| StorageError::Postgres)?;
429 if exists.is_some() {
430 self
431 .client
432 .execute(
433 "UPDATE device_pairing SET attempts = LEAST(attempts + 1, $2) WHERE pair_code = $1",
434 &[&pair_code, &PAIRING_MAX_ATTEMPTS],
435 )
436 .await
437 .map_err(|_| StorageError::Postgres)?;
438 return Err(StorageError::Invalid);
439 }
440 return Err(StorageError::Missing);
441 }
442 };
443 let profile = self.load_user(&user_id).await?;
444 Ok(PairingClaimResult {
445 user: profile,
446 issuer_device_id,
447 })
448 }
449
450 pub async fn invalidate_expired_pairings(&self) -> Result<u64, StorageError> {
452 let affected = self
453 .client
454 .execute(
455 "DELETE FROM device_pairing WHERE expires_at <= now() OR attempts >= $1 OR (redeemed_at IS NOT NULL AND redeemed_at <= now() - interval '1 day')",
456 &[&PAIRING_MAX_ATTEMPTS],
457 )
458 .await
459 .map_err(|_| StorageError::Postgres)?;
460 Ok(affected)
461 }
462
463 pub async fn latest_device_key_event(
465 &self,
466 device_id: &str,
467 ) -> Result<Option<DeviceKeyEvent>, StorageError> {
468 let query = "SELECT event_id, device_id, public_key, recorded_at
469 FROM device_key_event WHERE device_id = $1 ORDER BY recorded_at DESC LIMIT 1";
470 let row = self
471 .client
472 .query_opt(query, &[&device_id])
473 .await
474 .map_err(|_| StorageError::Postgres)?;
475 Ok(row.map(|row| DeviceKeyEvent {
476 event_id: row.get(0),
477 device_id: row.get(1),
478 public_key: row.get(2),
479 recorded_at: row.get(3),
480 }))
481 }
482
483 pub async fn record_session(&self, session: &SessionRecord) -> Result<(), StorageError> {
485 let query =
486 "INSERT INTO session (opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds)
487 VALUES ($1, $2, $3, $4, $5, $6)";
488 self.client
489 .execute(
490 query,
491 &[
492 &session.session_id,
493 &session.user_id,
494 &session.device_id,
495 &session.tls_fingerprint,
496 &session.created_at,
497 &session.ttl_seconds,
498 ],
499 )
500 .await
501 .map_err(|_| StorageError::Postgres)?;
502 Ok(())
503 }
504
505 pub async fn load_session(&self, session_id: &str) -> Result<SessionRecord, StorageError> {
507 let row = self
508 .client
509 .query_opt(
510 "SELECT opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds FROM session WHERE opaque_id = $1",
511 &[&session_id],
512 )
513 .await
514 .map_err(|_| StorageError::Postgres)?;
515 let row = row.ok_or(StorageError::Missing)?;
516 Ok(SessionRecord {
517 session_id: row.get(0),
518 user_id: row.get(1),
519 device_id: row.get(2),
520 tls_fingerprint: row.get(3),
521 created_at: row.get(4),
522 ttl_seconds: row.get(5),
523 })
524 }
525
526 pub async fn load_device(&self, device_id: &str) -> Result<DeviceRecord, StorageError> {
528 let row = self
529 .client
530 .query_opt(
531 "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE opaque_id = $1",
532 &[&device_id],
533 )
534 .await
535 .map_err(|_| StorageError::Postgres)?;
536 let row = row.ok_or(StorageError::Missing)?;
537 Ok(DeviceRecord {
538 device_id: row.get(0),
539 user_id: row.get(1),
540 public_key: row.get(2),
541 status: row.get(3),
542 created_at: row.get(4),
543 })
544 }
545
546 pub async fn count_active_devices(&self, user_id: &str) -> Result<i64, StorageError> {
548 let row = self
549 .client
550 .query_one(
551 "SELECT COUNT(*) FROM user_device WHERE user_id = $1 AND status = 'active'",
552 &[&user_id],
553 )
554 .await
555 .map_err(|_| StorageError::Postgres)?;
556 Ok(row.get(0))
557 }
558
559 pub async fn list_devices_for_user(
561 &self,
562 user_id: &str,
563 ) -> Result<Vec<DeviceRecord>, StorageError> {
564 let rows = self
565 .client
566 .query(
567 "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE user_id = $1 ORDER BY created_at ASC",
568 &[&user_id],
569 )
570 .await
571 .map_err(|_| StorageError::Postgres)?;
572 Ok(rows
573 .into_iter()
574 .map(|row| DeviceRecord {
575 device_id: row.get(0),
576 user_id: row.get(1),
577 public_key: row.get(2),
578 status: row.get(3),
579 created_at: row.get(4),
580 })
581 .collect())
582 }
583
584 pub async fn activate_device(&self, device_id: &str) -> Result<(), StorageError> {
586 self.update_device_status(device_id, "active").await
587 }
588
589 pub async fn deactivate_device(&self, device_id: &str) -> Result<(), StorageError> {
591 self.update_device_status(device_id, "revoked").await
592 }
593
594 async fn update_device_status(
595 &self,
596 device_id: &str,
597 status: &str,
598 ) -> Result<(), StorageError> {
599 let affected = self
600 .client
601 .execute(
602 "UPDATE user_device SET status = $2 WHERE opaque_id = $1",
603 &[&device_id, &status],
604 )
605 .await
606 .map_err(|_| StorageError::Postgres)?;
607 if affected == 0 {
608 return Err(StorageError::Missing);
609 }
610 Ok(())
611 }
612
613 pub async fn create_user(&self, profile: &NewUserProfile) -> Result<UserProfile, StorageError> {
615 let now = Utc::now();
616 let row = self
617 .client
618 .query_one(
619 "INSERT INTO app_user (user_id, handle, display_name, avatar_url, created_at, updated_at)
620 VALUES ($1, $2, $3, $4, $5, $5)
621 RETURNING user_id, handle, display_name, avatar_url, created_at, updated_at",
622 &[
623 &profile.user_id,
624 &profile.handle,
625 &profile.display_name,
626 &profile.avatar_url,
627 &now,
628 ],
629 )
630 .await
631 .map_err(|_| StorageError::Postgres)?;
632 Ok(UserProfile {
633 user_id: row.get(0),
634 handle: row.get(1),
635 display_name: row.get(2),
636 avatar_url: row.get(3),
637 created_at: row.get(4),
638 updated_at: row.get(5),
639 })
640 }
641
642 pub async fn load_user(&self, user_id: &str) -> Result<UserProfile, StorageError> {
644 let row = self
645 .client
646 .query_opt(
647 "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE user_id = $1",
648 &[&user_id],
649 )
650 .await
651 .map_err(|_| StorageError::Postgres)?;
652 let row = row.ok_or(StorageError::Missing)?;
653 Ok(UserProfile {
654 user_id: row.get(0),
655 handle: row.get(1),
656 display_name: row.get(2),
657 avatar_url: row.get(3),
658 created_at: row.get(4),
659 updated_at: row.get(5),
660 })
661 }
662
663 pub async fn load_user_by_handle(&self, handle: &str) -> Result<UserProfile, StorageError> {
665 let row = self
666 .client
667 .query_opt(
668 "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE handle = $1",
669 &[&handle],
670 )
671 .await
672 .map_err(|_| StorageError::Postgres)?;
673 let row = row.ok_or(StorageError::Missing)?;
674 Ok(UserProfile {
675 user_id: row.get(0),
676 handle: row.get(1),
677 display_name: row.get(2),
678 avatar_url: row.get(3),
679 created_at: row.get(4),
680 updated_at: row.get(5),
681 })
682 }
683
684 pub async fn update_user_profile(
686 &self,
687 user_id: &str,
688 display_name: Option<&str>,
689 avatar_url: Option<&str>,
690 ) -> Result<(), StorageError> {
691 let now = Utc::now();
692 let affected = self
693 .client
694 .execute(
695 "UPDATE app_user SET display_name = COALESCE($2, display_name), avatar_url = COALESCE($3, avatar_url), updated_at = $4 WHERE user_id = $1",
696 &[&user_id, &display_name, &avatar_url, &now],
697 )
698 .await
699 .map_err(|_| StorageError::Postgres)?;
700 if affected == 0 {
701 return Err(StorageError::Missing);
702 }
703 Ok(())
704 }
705
706 pub async fn create_group(&self, group: &ChatGroup) -> Result<(), StorageError> {
708 self.client
709 .execute(
710 "INSERT INTO chat_group (group_id, owner_device, created_at) VALUES ($1, $2, $3)
711 ON CONFLICT (group_id) DO NOTHING",
712 &[&group.group_id, &group.owner_device, &group.created_at],
713 )
714 .await
715 .map_err(|_| StorageError::Postgres)?;
716 self.client
717 .execute(
718 "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
719 ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role",
720 &[
721 &group.group_id,
722 &group.owner_device,
723 &GroupRole::Owner.as_str(),
724 &group.created_at,
725 ],
726 )
727 .await
728 .map_err(|_| StorageError::Postgres)?;
729 Ok(())
730 }
731
732 pub async fn add_group_member(&self, member: &GroupMember) -> Result<(), StorageError> {
734 let query = "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
735 ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role, joined_at = excluded.joined_at";
736 self.client
737 .execute(
738 query,
739 &[
740 &member.group_id,
741 &member.device_id,
742 &member.role.as_str(),
743 &member.joined_at,
744 ],
745 )
746 .await
747 .map_err(|_| StorageError::Postgres)?;
748 Ok(())
749 }
750
751 pub async fn remove_group_member(
753 &self,
754 group_id: &str,
755 device_id: &str,
756 ) -> Result<(), StorageError> {
757 let affected = self
758 .client
759 .execute(
760 "DELETE FROM group_member WHERE group_id = $1 AND device_id = $2",
761 &[&group_id, &device_id],
762 )
763 .await
764 .map_err(|_| StorageError::Postgres)?;
765 if affected == 0 {
766 return Err(StorageError::Missing);
767 }
768 Ok(())
769 }
770
771 pub async fn list_group_members(
773 &self,
774 group_id: &str,
775 ) -> Result<Vec<GroupMember>, StorageError> {
776 let rows = self
777 .client
778 .query(
779 "SELECT group_id, device_id, role, joined_at FROM group_member WHERE group_id = $1 ORDER BY joined_at ASC",
780 &[&group_id],
781 )
782 .await
783 .map_err(|_| StorageError::Postgres)?;
784 let mut members = Vec::with_capacity(rows.len());
785 for row in rows {
786 let role: String = row.get(2);
787 let parsed = GroupRole::from_str(role.as_str())?;
788 members.push(GroupMember {
789 group_id: row.get(0),
790 device_id: row.get(1),
791 role: parsed,
792 joined_at: row.get(3),
793 });
794 }
795 Ok(members)
796 }
797
798 pub async fn load_group(&self, group_id: &str) -> Result<ChatGroup, StorageError> {
800 let row = self
801 .client
802 .query_opt(
803 "SELECT group_id, owner_device, created_at FROM chat_group WHERE group_id = $1",
804 &[&group_id],
805 )
806 .await
807 .map_err(|_| StorageError::Postgres)?;
808 let row = row.ok_or(StorageError::Missing)?;
809 Ok(ChatGroup {
810 group_id: row.get(0),
811 owner_device: row.get(1),
812 created_at: row.get(2),
813 })
814 }
815
816 pub async fn list_groups_for_device(
818 &self,
819 device_id: &str,
820 ) -> Result<Vec<ChatGroup>, StorageError> {
821 let rows = self
822 .client
823 .query(
824 "SELECT g.group_id, g.owner_device, g.created_at FROM chat_group g
825 INNER JOIN group_member m ON g.group_id = m.group_id
826 WHERE m.device_id = $1 ORDER BY g.created_at ASC",
827 &[&device_id],
828 )
829 .await
830 .map_err(|_| StorageError::Postgres)?;
831 Ok(rows
832 .into_iter()
833 .map(|row| ChatGroup {
834 group_id: row.get(0),
835 owner_device: row.get(1),
836 created_at: row.get(2),
837 })
838 .collect())
839 }
840
841 pub async fn upsert_federation_peer(
843 &self,
844 peer: &FederationPeerRecord,
845 ) -> Result<(), StorageError> {
846 let query = "INSERT INTO federation_peer (domain, endpoint, public_key, status, updated_at)
847 VALUES ($1, $2, $3, $4, $5)
848 ON CONFLICT (domain) DO UPDATE SET endpoint = excluded.endpoint, public_key = excluded.public_key, status = excluded.status, updated_at = excluded.updated_at";
849 self.client
850 .execute(
851 query,
852 &[
853 &peer.domain,
854 &peer.endpoint,
855 &peer.public_key.as_slice(),
856 &peer.status.as_str(),
857 &peer.updated_at,
858 ],
859 )
860 .await
861 .map_err(|_| StorageError::Postgres)?;
862 Ok(())
863 }
864
865 pub async fn load_federation_peer(
867 &self,
868 domain: &str,
869 ) -> Result<FederationPeerRecord, StorageError> {
870 let row = self
871 .client
872 .query_opt(
873 "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer WHERE domain = $1",
874 &[&domain],
875 )
876 .await
877 .map_err(|_| StorageError::Postgres)?;
878 let row = row.ok_or(StorageError::Missing)?;
879 let key: Vec<u8> = row.get(2);
880 let status: String = row.get(3);
881 let status = FederationPeerStatus::from_str(status.as_str())?;
882 let public_key: [u8; 32] = key
883 .as_slice()
884 .try_into()
885 .map_err(|_| StorageError::Serialization)?;
886 Ok(FederationPeerRecord {
887 domain: row.get(0),
888 endpoint: row.get(1),
889 public_key,
890 status,
891 updated_at: row.get(4),
892 })
893 }
894
895 pub async fn list_federation_peers(&self) -> Result<Vec<FederationPeerRecord>, StorageError> {
897 let rows = self
898 .client
899 .query(
900 "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer",
901 &[],
902 )
903 .await
904 .map_err(|_| StorageError::Postgres)?;
905 let mut peers = Vec::with_capacity(rows.len());
906 for row in rows {
907 let key: Vec<u8> = row.get(2);
908 let status: String = row.get(3);
909 let status = FederationPeerStatus::from_str(status.as_str())?;
910 let public_key: [u8; 32] = key
911 .as_slice()
912 .try_into()
913 .map_err(|_| StorageError::Serialization)?;
914 peers.push(FederationPeerRecord {
915 domain: row.get(0),
916 endpoint: row.get(1),
917 public_key,
918 status,
919 updated_at: row.get(4),
920 });
921 }
922 Ok(peers)
923 }
924
925 pub async fn set_federation_peer_status(
927 &self,
928 domain: &str,
929 status: FederationPeerStatus,
930 ) -> Result<(), StorageError> {
931 let now = Utc::now();
932 let affected = self
933 .client
934 .execute(
935 "UPDATE federation_peer SET status = $2, updated_at = $3 WHERE domain = $1",
936 &[&domain, &status.as_str(), &now],
937 )
938 .await
939 .map_err(|_| StorageError::Postgres)?;
940 if affected == 0 {
941 return Err(StorageError::Missing);
942 }
943 Ok(())
944 }
945
946 pub async fn enqueue_relay(&self, envelope: &RelayEnvelope) -> Result<(), StorageError> {
948 let query =
949 "INSERT INTO relay_queue (envelope_id, channel_id, payload, deliver_after, expires_at)
950 VALUES ($1, $2, $3, $4, $5)";
951 self.client
952 .execute(
953 query,
954 &[
955 &envelope.envelope_id,
956 &envelope.channel_id,
957 &envelope.payload,
958 &envelope.deliver_after,
959 &envelope.expires_at,
960 ],
961 )
962 .await
963 .map_err(|_| StorageError::Postgres)?;
964 Ok(())
965 }
966
967 pub async fn claim_envelopes(
969 &self,
970 channel_id: &str,
971 limit: i64,
972 ) -> Result<Vec<RelayEnvelope>, StorageError> {
973 let query = "DELETE FROM relay_queue
974 WHERE envelope_id IN (
975 SELECT envelope_id FROM relay_queue
976 WHERE channel_id = $1 AND deliver_after <= now()
977 ORDER BY deliver_after ASC
978 LIMIT $2
979 )
980 RETURNING envelope_id, channel_id, payload, deliver_after, expires_at";
981 let rows = self
982 .client
983 .query(query, &[&channel_id, &limit])
984 .await
985 .map_err(|_| StorageError::Postgres)?;
986 Ok(rows
987 .into_iter()
988 .map(|row| RelayEnvelope {
989 envelope_id: row.get(0),
990 channel_id: row.get(1),
991 payload: row.get(2),
992 deliver_after: row.get(3),
993 expires_at: row.get(4),
994 })
995 .collect())
996 }
997
998 pub async fn store_inbox_offset(&self, offset: &InboxOffset) -> Result<(), StorageError> {
1000 let query = "INSERT INTO inbox_offset (entity_id, channel_id, last_envelope_id, updated_at)
1001 VALUES ($1, $2, $3, $4)
1002 ON CONFLICT (entity_id, channel_id) DO UPDATE SET last_envelope_id = excluded.last_envelope_id, updated_at = excluded.updated_at";
1003 self.client
1004 .execute(
1005 query,
1006 &[
1007 &offset.entity_id,
1008 &offset.channel_id,
1009 &offset.last_envelope_id,
1010 &offset.updated_at,
1011 ],
1012 )
1013 .await
1014 .map_err(|_| StorageError::Postgres)?;
1015 Ok(())
1016 }
1017
1018 pub async fn read_inbox_offset(
1020 &self,
1021 entity_id: &str,
1022 channel_id: &str,
1023 ) -> Result<Option<InboxOffset>, StorageError> {
1024 let row = self
1025 .client
1026 .query_opt(
1027 "SELECT entity_id, channel_id, last_envelope_id, updated_at FROM inbox_offset WHERE entity_id = $1 AND channel_id = $2",
1028 &[&entity_id, &channel_id],
1029 )
1030 .await
1031 .map_err(|_| StorageError::Postgres)?;
1032 Ok(row.map(|row| InboxOffset {
1033 entity_id: row.get(0),
1034 channel_id: row.get(1),
1035 last_envelope_id: row.get(2),
1036 updated_at: row.get(3),
1037 }))
1038 }
1039
1040 pub async fn store_idempotency(&self, key: &IdempotencyKey) -> Result<(), StorageError> {
1042 let query = "INSERT INTO idempotency (key, scope, created_at) VALUES ($1, $2, $3)
1043 ON CONFLICT (key, scope) DO NOTHING";
1044 self.client
1045 .execute(query, &[&key.key, &key.scope, &key.created_at])
1046 .await
1047 .map_err(|_| StorageError::Postgres)?;
1048 Ok(())
1049 }
1050
1051 pub async fn publish_presence(&self, snapshot: &PresenceSnapshot) -> Result<(), StorageError> {
1053 let mut conn = self.redis.lock().await;
1054 let ttl = (snapshot.expires_at.timestamp() - Utc::now().timestamp()).max(1) as usize;
1055 let payload = serde_json::json!({
1056 "entity": snapshot.entity,
1057 "state": snapshot.state,
1058 "expires_at": snapshot.expires_at.to_rfc3339(),
1059 "user": snapshot.user_id.as_ref().map(|id| serde_json::json!({
1060 "id": id,
1061 "handle": snapshot.handle.clone(),
1062 "display_name": snapshot.display_name.clone(),
1063 "avatar_url": snapshot.avatar_url.clone(),
1064 })),
1065 })
1066 .to_string();
1067 redis::cmd("SETEX")
1068 .arg(format!("presence:{}", snapshot.entity))
1069 .arg(ttl)
1070 .arg(payload)
1071 .query_async::<_, ()>(&mut *conn)
1072 .await
1073 .map_err(|_| StorageError::Redis)?;
1074 Ok(())
1075 }
1076
1077 pub async fn read_presence(
1079 &self,
1080 entity: &str,
1081 ) -> Result<Option<PresenceSnapshot>, StorageError> {
1082 let mut conn = self.redis.lock().await;
1083 let value: Option<String> = redis::cmd("GET")
1084 .arg(format!("presence:{}", entity))
1085 .query_async::<_, Option<String>>(&mut *conn)
1086 .await
1087 .map_err(|_| StorageError::Redis)?;
1088 if let Some(json) = value {
1089 let parsed: Value =
1090 serde_json::from_str(&json).map_err(|_| StorageError::Serialization)?;
1091 let state = parsed
1092 .get("state")
1093 .and_then(|v| v.as_str())
1094 .unwrap_or("online")
1095 .to_string();
1096 let expires = parsed
1097 .get("expires_at")
1098 .and_then(|v| v.as_str())
1099 .ok_or(StorageError::Serialization)?;
1100 let expires = DateTime::parse_from_rfc3339(expires)
1101 .map_err(|_| StorageError::Serialization)?
1102 .with_timezone(&Utc);
1103 let user_obj = parsed.get("user").and_then(|v| v.as_object());
1104 let user_id = user_obj
1105 .and_then(|map| map.get("id"))
1106 .and_then(|v| v.as_str())
1107 .map(|v| v.to_string());
1108 let handle = user_obj
1109 .and_then(|map| map.get("handle"))
1110 .and_then(|v| v.as_str())
1111 .map(|v| v.to_string());
1112 let display_name = user_obj
1113 .and_then(|map| map.get("display_name"))
1114 .and_then(|v| v.as_str())
1115 .map(|v| v.to_string());
1116 let avatar_url = user_obj
1117 .and_then(|map| map.get("avatar_url"))
1118 .and_then(|v| v.as_str())
1119 .map(|v| v.to_string());
1120 Ok(Some(PresenceSnapshot {
1121 entity: entity.to_string(),
1122 state,
1123 expires_at: expires,
1124 user_id,
1125 handle,
1126 display_name,
1127 avatar_url,
1128 }))
1129 } else {
1130 Ok(None)
1131 }
1132 }
1133
1134 pub async fn register_route(
1136 &self,
1137 entity: &str,
1138 session_id: &str,
1139 ttl_seconds: i64,
1140 ) -> Result<(), StorageError> {
1141 let mut conn = self.redis.lock().await;
1142 redis::cmd("SETEX")
1143 .arg(format!("route:{}", entity))
1144 .arg(ttl_seconds.max(1) as usize)
1145 .arg(session_id)
1146 .query_async::<_, ()>(&mut *conn)
1147 .await
1148 .map_err(|_| StorageError::Redis)?;
1149 Ok(())
1150 }
1151
1152 pub async fn clear_route(&self, entity: &str) -> Result<(), StorageError> {
1154 let mut conn = self.redis.lock().await;
1155 let _: () = redis::cmd("DEL")
1156 .arg(format!("route:{}", entity))
1157 .query_async::<_, ()>(&mut *conn)
1158 .await
1159 .map_err(|_| StorageError::Redis)?;
1160 Ok(())
1161 }
1162}
1163
1164fn generate_pair_code() -> String {
1165 let mut seed = [0u8; PAIRING_CODE_LENGTH];
1166 OsRng.fill_bytes(&mut seed);
1167 let mut output = String::with_capacity(PAIRING_CODE_LENGTH + 1);
1168 for (index, byte) in seed.iter().enumerate() {
1169 let symbol = PAIRING_ALPHABET[(*byte as usize) % PAIRING_ALPHABET.len()] as char;
1170 output.push(symbol);
1171 if index == (PAIRING_CODE_LENGTH / 2) - 1 {
1172 output.push('-');
1173 }
1174 }
1175 if output.ends_with('-') {
1176 output.pop();
1177 }
1178 output
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use super::*;
1184 use chrono::Utc;
1185 use std::str::FromStr;
1186
1187 #[test]
1188 fn init_sql_exists() {
1189 assert!(INIT_SQL.contains("CREATE TABLE"));
1190 }
1191
1192 #[test]
1193 fn pairing_code_format() {
1194 let code = generate_pair_code();
1195 assert_eq!(code.len(), PAIRING_CODE_LENGTH + 1);
1196 assert!(code.contains('-'));
1197 }
1198
1199 #[test]
1200 fn init_sql_declares_new_relations() {
1201 assert!(INIT_SQL.contains("device_key_event"));
1202 assert!(INIT_SQL.contains("chat_group"));
1203 assert!(INIT_SQL.contains("group_member"));
1204 assert!(INIT_SQL.contains("federation_peer"));
1205 assert!(INIT_SQL.contains("inbox_offset"));
1206 }
1207
1208 #[test]
1209 fn pairing_sql_declares_pairing_table() {
1210 assert!(PAIRING_SQL.contains("device_pairing"));
1211 }
1212
1213 #[test]
1214 fn group_role_roundtrip() {
1215 assert_eq!(GroupRole::Owner.as_str(), "owner");
1216 assert_eq!(GroupRole::from_str("admin").unwrap(), GroupRole::Admin);
1217 assert!(GroupRole::from_str("unknown").is_err());
1218 }
1219
1220 #[test]
1221 fn federation_status_roundtrip() {
1222 assert_eq!(FederationPeerStatus::Active.as_str(), "active");
1223 assert_eq!(
1224 FederationPeerStatus::from_str("pending").unwrap(),
1225 FederationPeerStatus::Pending
1226 );
1227 assert!(FederationPeerStatus::from_str("offline").is_err());
1228 }
1229
1230 #[tokio::test]
1231 async fn storage_integration_flow() -> Result<(), Box<dyn std::error::Error>> {
1232 let pg = match std::env::var("COMMUCAT_TEST_PG_DSN") {
1233 Ok(value) => value,
1234 Err(_) => {
1235 eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_PG_DSN not set");
1236 return Ok(());
1237 }
1238 };
1239 let redis = match std::env::var("COMMUCAT_TEST_REDIS_URL") {
1240 Ok(value) => value,
1241 Err(_) => {
1242 eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_REDIS_URL not set");
1243 return Ok(());
1244 }
1245 };
1246 let storage = connect(&pg, &redis).await?;
1247 storage.migrate().await?;
1248 let suffix = Utc::now().timestamp_nanos_opt().unwrap_or_default();
1249 let user_profile = NewUserProfile {
1250 user_id: format!("test-user-{}", suffix),
1251 handle: format!("tester{}", suffix),
1252 display_name: Some("Tester".to_string()),
1253 avatar_url: None,
1254 };
1255 let created = storage.create_user(&user_profile).await?;
1256 let device_id = format!("test-device-{}", suffix);
1257 let device_record = DeviceRecord {
1258 device_id: device_id.clone(),
1259 user_id: created.user_id.clone(),
1260 public_key: vec![1; 32],
1261 status: "active".to_string(),
1262 created_at: Utc::now(),
1263 };
1264 storage.upsert_device(&device_record).await?;
1265 let key_event = DeviceKeyEvent {
1266 event_id: format!("evt-{}", suffix),
1267 device_id: device_id.clone(),
1268 public_key: device_record.public_key.clone(),
1269 recorded_at: Utc::now(),
1270 };
1271 storage.record_device_key_event(&key_event).await?;
1272 let latest = storage
1273 .latest_device_key_event(&device_id)
1274 .await?
1275 .expect("expected key event");
1276 assert_eq!(latest.public_key.len(), 32);
1277
1278 let group = ChatGroup {
1279 group_id: format!("group-{}", suffix),
1280 owner_device: device_id.clone(),
1281 created_at: Utc::now(),
1282 };
1283 storage.create_group(&group).await?;
1284 let member = GroupMember {
1285 group_id: group.group_id.clone(),
1286 device_id: format!("peer-device-{}", suffix),
1287 role: GroupRole::Member,
1288 joined_at: Utc::now(),
1289 };
1290 storage.add_group_member(&member).await?;
1291 let members = storage.list_group_members(&group.group_id).await?;
1292 assert!(members.iter().any(|m| m.device_id == device_id));
1293 assert!(members.iter().any(|m| m.device_id == member.device_id));
1294 let memberships = storage.list_groups_for_device(&member.device_id).await?;
1295 assert_eq!(memberships.len(), 1);
1296 storage
1297 .remove_group_member(&group.group_id, &member.device_id)
1298 .await?;
1299
1300 let peer = FederationPeerRecord {
1301 domain: format!("peer{}.example", suffix),
1302 endpoint: "https://peer.example/federation".to_string(),
1303 public_key: [5u8; 32],
1304 status: FederationPeerStatus::Active,
1305 updated_at: Utc::now(),
1306 };
1307 storage.upsert_federation_peer(&peer).await?;
1308 let fetched = storage.load_federation_peer(&peer.domain).await?;
1309 assert_eq!(fetched.endpoint, peer.endpoint);
1310
1311 let offset = InboxOffset {
1312 entity_id: device_id.clone(),
1313 channel_id: format!("inbox:{}", device_id),
1314 last_envelope_id: Some(format!("env-{}", suffix)),
1315 updated_at: Utc::now(),
1316 };
1317 storage.store_inbox_offset(&offset).await?;
1318 let loaded = storage
1319 .read_inbox_offset(&offset.entity_id, &offset.channel_id)
1320 .await?
1321 .expect("offset present");
1322 assert_eq!(loaded.last_envelope_id, offset.last_envelope_id);
1323
1324 let ticket = storage
1325 .create_pairing_token(&created.user_id, &device_id, 300)
1326 .await?;
1327 assert_eq!(ticket.pair_code.len(), 9);
1328 let paired_device = format!("paired-device-{}", suffix);
1329 let claim = storage
1330 .claim_pairing_token(&ticket.pair_code, &paired_device, &[7u8; 32])
1331 .await?;
1332 assert_eq!(claim.user.user_id, created.user_id);
1333 assert_eq!(claim.issuer_device_id, device_id);
1334 storage
1335 .client
1336 .execute(
1337 "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at, attempts) VALUES ($1, $2, $3, now(), now() - interval '10 minutes', 0)",
1338 &[&format!("expired-{}", suffix), &created.user_id, &device_id],
1339 )
1340 .await
1341 .map_err(|_| StorageError::Postgres)?;
1342 let purged = storage.invalidate_expired_pairings().await?;
1343 assert!(purged >= 1);
1344 Ok(())
1345 }
1346}