1use chrono::{DateTime, Duration, Utc};
2use rand::{RngCore, rngs::OsRng};
3use serde_json::Value;
4use std::convert::TryInto;
5use std::error::Error;
6use std::fmt::{Display, Formatter};
7use std::str::FromStr;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11use tokio_postgres::{Client, NoTls};
12
13const INIT_SQL: &str = include_str!("../migrations/001_init.sql");
14const PAIRING_SQL: &str = include_str!("../migrations/002_pairing.sql");
15const USER_BLOB_SQL: &str = include_str!("../migrations/003_user_blob.sql");
16const SERVER_SECRET_SQL: &str = include_str!("../migrations/004_server_secrets.sql");
17const DEVICE_ROTATION_SQL: &str = include_str!("../migrations/005_device_rotation.sql");
18const FEDERATION_OUTBOX_SQL: &str = include_str!("../migrations/006_federation_outbox.sql");
19const FRIEND_REQUESTS_SQL: &str = include_str!("../migrations/007_friend_requests.sql");
20const FEDERATED_IDENTIFIERS_SQL: &str = include_str!("../migrations/008_federated_identifiers.sql");
21const DEVICE_PQ_SQL: &str = include_str!("../migrations/009_device_pq.sql");
22const PAIRING_MAX_ATTEMPTS: i32 = 5;
23const PAIRING_CODE_LENGTH: usize = 8;
24const PAIRING_ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
25
26#[derive(Debug)]
27pub enum StorageError {
28 Postgres,
29 Redis,
30 Serialization,
31 Missing,
32 Invalid,
33}
34
35impl Display for StorageError {
36 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37 match self {
38 Self::Postgres => write!(f, "postgres failure"),
39 Self::Redis => write!(f, "redis failure"),
40 Self::Serialization => write!(f, "serialization failure"),
41 Self::Missing => write!(f, "missing record"),
42 Self::Invalid => write!(f, "invalid state"),
43 }
44 }
45}
46
47impl Error for StorageError {}
48
49pub struct Storage {
50 client: Client,
51 _pg_task: JoinHandle<()>,
52 redis: Arc<Mutex<redis::aio::MultiplexedConnection>>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct NewUserProfile {
57 pub user_id: String,
58 pub handle: String,
59 pub domain: String,
60 pub display_name: Option<String>,
61 pub avatar_url: Option<String>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct UserProfile {
66 pub user_id: String,
67 pub handle: String,
68 pub domain: String,
69 pub display_name: Option<String>,
70 pub avatar_url: Option<String>,
71 pub created_at: DateTime<Utc>,
72 pub updated_at: DateTime<Utc>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct DeviceRecord {
77 pub device_id: String,
78 pub user_id: String,
79 pub public_key: Vec<u8>,
80 pub status: String,
81 pub created_at: DateTime<Utc>,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct DevicePqKeys {
86 pub device_id: String,
87 pub kem_public: Vec<u8>,
88 pub signature_public: Vec<u8>,
89 pub updated_at: DateTime<Utc>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct SessionRecord {
94 pub session_id: String,
95 pub user_id: String,
96 pub device_id: String,
97 pub tls_fingerprint: String,
98 pub created_at: DateTime<Utc>,
99 pub ttl_seconds: i64,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ServerSecretRecord {
104 pub name: String,
105 pub version: i64,
106 pub secret: Vec<u8>,
107 pub public: Option<Vec<u8>>,
108 pub metadata: Value,
109 pub created_at: DateTime<Utc>,
110 pub valid_after: DateTime<Utc>,
111 pub rotates_at: DateTime<Utc>,
112 pub expires_at: DateTime<Utc>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct RelayEnvelope {
117 pub envelope_id: String,
118 pub channel_id: String,
119 pub payload: Vec<u8>,
120 pub deliver_after: DateTime<Utc>,
121 pub expires_at: DateTime<Utc>,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct IdempotencyKey {
126 pub key: String,
127 pub scope: String,
128 pub created_at: DateTime<Utc>,
129}
130
131#[derive(Debug, Clone)]
132pub struct FederationOutboxInsert<'a> {
133 pub outbox_id: &'a str,
134 pub destination: &'a str,
135 pub endpoint: &'a str,
136 pub payload: &'a serde_json::Value,
137 pub public_key: &'a [u8; 32],
138 pub next_attempt_at: DateTime<Utc>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct FederationOutboxMessage {
143 pub outbox_id: String,
144 pub destination: String,
145 pub endpoint: String,
146 pub payload: serde_json::Value,
147 pub public_key: [u8; 32],
148 pub attempts: i32,
149 pub next_attempt_at: DateTime<Utc>,
150 pub last_error: Option<String>,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
154pub struct PresenceSnapshot {
155 pub entity: String,
156 pub state: String,
157 pub expires_at: DateTime<Utc>,
158 pub user_id: Option<String>,
159 pub handle: Option<String>,
160 pub display_name: Option<String>,
161 pub avatar_url: Option<String>,
162}
163
164fn presence_user_payload(snapshot: &PresenceSnapshot) -> Option<serde_json::Value> {
165 snapshot.user_id.as_ref().map(|id| {
166 let user_id = id.clone();
167 serde_json::json!({
168 "id": user_id.clone(),
169 "user_id": user_id,
170 "handle": snapshot.handle.clone(),
171 "display_name": snapshot.display_name.clone(),
172 "avatar_url": snapshot.avatar_url.clone(),
173 })
174 })
175}
176
177fn presence_user_fields(
178 map: &serde_json::Map<String, serde_json::Value>,
179) -> (
180 Option<String>,
181 Option<String>,
182 Option<String>,
183 Option<String>,
184) {
185 let user_id = map
186 .get("user_id")
187 .or_else(|| map.get("id"))
188 .and_then(|v| v.as_str())
189 .map(|v| v.to_string());
190 let handle = map
191 .get("handle")
192 .and_then(|v| v.as_str())
193 .map(|v| v.to_string());
194 let display_name = map
195 .get("display_name")
196 .and_then(|v| v.as_str())
197 .map(|v| v.to_string());
198 let avatar_url = map
199 .get("avatar_url")
200 .and_then(|v| v.as_str())
201 .map(|v| v.to_string());
202 (user_id, handle, display_name, avatar_url)
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DeviceKeyEvent {
207 pub event_id: String,
208 pub device_id: String,
209 pub public_key: Vec<u8>,
210 pub recorded_at: DateTime<Utc>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct FriendRequest {
215 pub id: String,
216 pub from_user_id: String,
217 pub to_user_id: String,
218 pub status: String, pub message: Option<String>,
220 pub created_at: DateTime<Utc>,
221 pub updated_at: DateTime<Utc>,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct DeviceRotationAudit {
226 pub rotation_id: String,
227 pub device_id: String,
228 pub old_public_key: Vec<u8>,
229 pub new_public_key: Vec<u8>,
230 pub signature: Vec<u8>,
231 pub nonce: Option<Vec<u8>>,
232 pub proof_expires_at: DateTime<Utc>,
233 pub applied_at: DateTime<Utc>,
234}
235
236#[derive(Debug, Clone)]
237pub struct DeviceRotationRecord<'a> {
238 pub rotation_id: &'a str,
239 pub device_id: &'a str,
240 pub user_id: &'a str,
241 pub old_public_key: &'a [u8],
242 pub new_public_key: &'a [u8],
243 pub signature: &'a [u8],
244 pub nonce: Option<&'a [u8]>,
245 pub proof_expires_at: DateTime<Utc>,
246 pub applied_at: DateTime<Utc>,
247 pub event_id: &'a str,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct PairingTokenIssued {
252 pub pair_code: String,
253 pub issued_at: DateTime<Utc>,
254 pub expires_at: DateTime<Utc>,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq)]
258pub struct PairingClaimResult {
259 pub user: UserProfile,
260 pub issuer_device_id: String,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct RemoteUserProfile {
266 pub user_id: String,
267 pub domain: String,
268 pub handle: String,
269 pub display_name: Option<String>,
270 pub avatar_url: Option<String>,
271 pub profile_data: serde_json::Value,
272 pub cached_at: DateTime<Utc>,
273 pub expires_at: DateTime<Utc>,
274 pub last_fetched_at: DateTime<Utc>,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct FederationPeerConnectionStatus {
280 pub domain: String,
281 pub endpoint: String,
282 pub public_key: Vec<u8>,
283 pub last_seen_at: Option<DateTime<Utc>>,
284 pub last_error: Option<String>,
285 pub error_count: i32,
286 pub status: String, pub created_at: DateTime<Utc>,
288 pub updated_at: DateTime<Utc>,
289}
290
291#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct FederatedFriendRequest {
294 pub request_id: String,
295 pub from_user_id: String, pub to_user_id: String, pub from_domain: String,
298 pub to_domain: String,
299 pub message: Option<String>,
300 pub status: String, pub federation_event_id: Option<String>,
302 pub created_at: DateTime<Utc>,
303 pub updated_at: DateTime<Utc>,
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct ChatGroup {
308 pub group_id: String,
309 pub owner_device: String,
310 pub created_at: DateTime<Utc>,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub enum GroupRole {
315 Owner,
316 Admin,
317 Member,
318}
319
320impl GroupRole {
321 fn as_str(&self) -> &'static str {
322 match self {
323 GroupRole::Owner => "owner",
324 GroupRole::Admin => "admin",
325 GroupRole::Member => "member",
326 }
327 }
328}
329
330impl FromStr for GroupRole {
331 type Err = StorageError;
332
333 fn from_str(value: &str) -> Result<Self, Self::Err> {
334 match value {
335 "owner" => Ok(GroupRole::Owner),
336 "admin" => Ok(GroupRole::Admin),
337 "member" => Ok(GroupRole::Member),
338 _ => Err(StorageError::Serialization),
339 }
340 }
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct GroupMember {
345 pub group_id: String,
346 pub device_id: String,
347 pub role: GroupRole,
348 pub joined_at: DateTime<Utc>,
349}
350
351#[derive(Debug, Clone, PartialEq, Eq)]
352pub enum FederationPeerStatus {
353 Active,
354 Pending,
355 Blocked,
356}
357
358impl FederationPeerStatus {
359 fn as_str(&self) -> &'static str {
360 match self {
361 FederationPeerStatus::Active => "active",
362 FederationPeerStatus::Pending => "pending",
363 FederationPeerStatus::Blocked => "blocked",
364 }
365 }
366}
367
368impl FromStr for FederationPeerStatus {
369 type Err = StorageError;
370
371 fn from_str(value: &str) -> Result<Self, Self::Err> {
372 match value {
373 "active" => Ok(FederationPeerStatus::Active),
374 "pending" => Ok(FederationPeerStatus::Pending),
375 "blocked" => Ok(FederationPeerStatus::Blocked),
376 _ => Err(StorageError::Serialization),
377 }
378 }
379}
380
381#[derive(Debug, Clone, PartialEq, Eq)]
382pub struct FederationPeerRecord {
383 pub domain: String,
384 pub endpoint: String,
385 pub public_key: [u8; 32],
386 pub status: FederationPeerStatus,
387 pub updated_at: DateTime<Utc>,
388}
389
390#[derive(Debug, Clone, PartialEq, Eq)]
391pub struct InboxOffset {
392 pub entity_id: String,
393 pub channel_id: String,
394 pub last_envelope_id: Option<String>,
395 pub updated_at: DateTime<Utc>,
396}
397
398pub async fn connect(postgres_dsn: &str, redis_url: &str) -> Result<Storage, StorageError> {
400 let (client, connection) = tokio_postgres::connect(postgres_dsn, NoTls)
401 .await
402 .map_err(|_| StorageError::Postgres)?;
403 let task = tokio::spawn(async move {
404 if let Err(error) = connection.await {
405 tracing::error!("postgres connection stopped: {}", error);
406 }
407 });
408 let redis_client = redis::Client::open(redis_url).map_err(|_| StorageError::Redis)?;
409 let redis_connection = redis_client
410 .get_multiplexed_async_connection()
411 .await
412 .map_err(|_| StorageError::Redis)?;
413 Ok(Storage {
414 client,
415 _pg_task: task,
416 redis: Arc::new(Mutex::new(redis_connection)),
417 })
418}
419
420impl Storage {
421 pub async fn migrate(&self) -> Result<(), StorageError> {
423 self.client
424 .batch_execute(INIT_SQL)
425 .await
426 .map_err(|_| StorageError::Postgres)?;
427 self.client
428 .batch_execute(PAIRING_SQL)
429 .await
430 .map_err(|_| StorageError::Postgres)?;
431 self.client
432 .batch_execute(USER_BLOB_SQL)
433 .await
434 .map_err(|_| StorageError::Postgres)?;
435 self.client
436 .batch_execute(SERVER_SECRET_SQL)
437 .await
438 .map_err(|_| StorageError::Postgres)?;
439 self.client
440 .batch_execute(DEVICE_ROTATION_SQL)
441 .await
442 .map_err(|_| StorageError::Postgres)?;
443 self.client
444 .batch_execute(FEDERATION_OUTBOX_SQL)
445 .await
446 .map_err(|_| StorageError::Postgres)?;
447 self.client
448 .batch_execute(FRIEND_REQUESTS_SQL)
449 .await
450 .map_err(|_| StorageError::Postgres)?;
451 self.client
452 .batch_execute(FEDERATED_IDENTIFIERS_SQL)
453 .await
454 .map_err(|_| StorageError::Postgres)?;
455 self.client
456 .batch_execute(DEVICE_PQ_SQL)
457 .await
458 .map_err(|_| StorageError::Postgres)?;
459 Ok(())
460 }
461
462 pub async fn readiness(&self) -> Result<(), StorageError> {
464 self.client
465 .simple_query("SELECT 1")
466 .await
467 .map_err(|_| StorageError::Postgres)?;
468 let mut conn = self.redis.lock().await;
469 let _: String = redis::cmd("PING")
470 .query_async::<String>(&mut *conn)
471 .await
472 .map_err(|_| StorageError::Redis)?;
473 Ok(())
474 }
475
476 pub async fn read_user_blob(
478 &self,
479 user_id: &str,
480 key: &str,
481 ) -> Result<Option<String>, StorageError> {
482 let row = self
483 .client
484 .query_opt(
485 "SELECT payload FROM user_blob WHERE user_id = $1 AND key = $2",
486 &[&user_id, &key],
487 )
488 .await
489 .map_err(|_| StorageError::Postgres)?;
490 Ok(row.map(|row| row.get(0)))
491 }
492
493 pub async fn write_user_blob(
495 &self,
496 user_id: &str,
497 key: &str,
498 payload: &str,
499 ) -> Result<(), StorageError> {
500 let now = Utc::now();
501 self.client
502 .execute(
503 "INSERT INTO user_blob (user_id, key, payload, updated_at) VALUES ($1, $2, $3, $4)
504 ON CONFLICT (user_id, key) DO UPDATE SET payload = excluded.payload, updated_at = excluded.updated_at",
505 &[&user_id, &key, &payload, &now],
506 )
507 .await
508 .map_err(|_| StorageError::Postgres)?;
509 Ok(())
510 }
511
512 pub async fn insert_server_secret(
514 &self,
515 record: &ServerSecretRecord,
516 ) -> Result<(), StorageError> {
517 let query = "INSERT INTO server_secret (name, version, secret, public, metadata, created_at, valid_after, rotates_at, expires_at)
518 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
519 self.client
520 .execute(
521 query,
522 &[
523 &record.name,
524 &record.version,
525 &record.secret,
526 &record.public,
527 &record.metadata,
528 &record.created_at,
529 &record.valid_after,
530 &record.rotates_at,
531 &record.expires_at,
532 ],
533 )
534 .await
535 .map_err(|_| StorageError::Postgres)?;
536 Ok(())
537 }
538
539 pub async fn active_server_secrets(
541 &self,
542 name: &str,
543 moment: DateTime<Utc>,
544 ) -> Result<Vec<ServerSecretRecord>, StorageError> {
545 let query = "SELECT name, version, secret, public, metadata, created_at, valid_after, rotates_at, expires_at
546 FROM server_secret
547 WHERE name = $1 AND valid_after <= $2 AND expires_at > $2
548 ORDER BY version DESC";
549 let rows = self
550 .client
551 .query(query, &[&name, &moment])
552 .await
553 .map_err(|_| StorageError::Postgres)?;
554 Ok(rows
555 .into_iter()
556 .map(|row| ServerSecretRecord {
557 name: row.get(0),
558 version: row.get(1),
559 secret: row.get(2),
560 public: row.get(3),
561 metadata: row.get(4),
562 created_at: row.get(5),
563 valid_after: row.get(6),
564 rotates_at: row.get(7),
565 expires_at: row.get(8),
566 })
567 .collect())
568 }
569
570 pub async fn latest_server_secret_version(&self, name: &str) -> Result<i64, StorageError> {
572 let row = self
573 .client
574 .query_one(
575 "SELECT COALESCE(MAX(version), 0) FROM server_secret WHERE name = $1",
576 &[&name],
577 )
578 .await
579 .map_err(|_| StorageError::Postgres)?;
580 Ok(row.get(0))
581 }
582
583 pub async fn delete_expired_server_secrets(
585 &self,
586 name: &str,
587 threshold: DateTime<Utc>,
588 ) -> Result<u64, StorageError> {
589 let affected = self
590 .client
591 .execute(
592 "DELETE FROM server_secret WHERE name = $1 AND expires_at <= $2",
593 &[&name, &threshold],
594 )
595 .await
596 .map_err(|_| StorageError::Postgres)?;
597 Ok(affected)
598 }
599
600 pub async fn create_pairing_token(
602 &self,
603 user_id: &str,
604 issuer_device_id: &str,
605 ttl_seconds: i64,
606 ) -> Result<PairingTokenIssued, StorageError> {
607 let issuer = self.load_device(issuer_device_id).await?;
608 if issuer.user_id != user_id || issuer.status != "active" {
609 return Err(StorageError::Invalid);
610 }
611 let ttl = ttl_seconds.clamp(60, 3600);
612 let issued_at = Utc::now();
613 let expires_at = issued_at + Duration::seconds(ttl);
614 for _ in 0..16 {
615 let pair_code = generate_pair_code();
616 let inserted = self
617 .client
618 .execute(
619 "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at)
620 VALUES ($1, $2, $3, $4, $5)
621 ON CONFLICT (pair_code) DO NOTHING",
622 &[&pair_code, &user_id, &issuer_device_id, &issued_at, &expires_at],
623 )
624 .await
625 .map_err(|_| StorageError::Postgres)?;
626 if inserted == 1 {
627 return Ok(PairingTokenIssued {
628 pair_code,
629 issued_at,
630 expires_at,
631 });
632 }
633 }
634 Err(StorageError::Postgres)
635 }
636
637 pub async fn upsert_device(&self, record: &DeviceRecord) -> Result<(), StorageError> {
639 let query = "INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at) VALUES ($1, $2, $3, $4, $5)
640 ON CONFLICT (opaque_id) DO UPDATE SET pubkey = excluded.pubkey, status = excluded.status
641 WHERE user_device.user_id = excluded.user_id";
642 self.client
643 .execute(
644 query,
645 &[
646 &record.device_id,
647 &record.user_id,
648 &record.public_key,
649 &record.status,
650 &record.created_at,
651 ],
652 )
653 .await
654 .map_err(|_| StorageError::Postgres)?;
655 Ok(())
656 }
657
658 pub async fn record_device_key_event(
660 &self,
661 event: &DeviceKeyEvent,
662 ) -> Result<(), StorageError> {
663 let query = "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)";
664 self.client
665 .execute(
666 query,
667 &[
668 &event.event_id,
669 &event.device_id,
670 &event.public_key,
671 &event.recorded_at,
672 ],
673 )
674 .await
675 .map_err(|_| StorageError::Postgres)?;
676 Ok(())
677 }
678
679 pub async fn apply_device_key_rotation(
681 &self,
682 rotation: &DeviceRotationRecord<'_>,
683 ) -> Result<(), StorageError> {
684 self.client
685 .batch_execute("BEGIN")
686 .await
687 .map_err(|_| StorageError::Postgres)?;
688 let update_result = self
689 .client
690 .execute(
691 "UPDATE user_device SET pubkey = $1 WHERE opaque_id = $2 AND user_id = $3",
692 &[
693 &rotation.new_public_key,
694 &rotation.device_id,
695 &rotation.user_id,
696 ],
697 )
698 .await;
699 let updated = match update_result {
700 Ok(value) => value,
701 Err(_) => {
702 let _ = self.client.batch_execute("ROLLBACK").await;
703 return Err(StorageError::Postgres);
704 }
705 };
706 if updated != 1 {
707 let _ = self.client.batch_execute("ROLLBACK").await;
708 return Err(StorageError::Missing);
709 }
710 if self
711 .client
712 .execute(
713 "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)",
714 &[&rotation.event_id, &rotation.device_id, &rotation.new_public_key, &rotation.applied_at],
715 )
716 .await
717 .is_err()
718 {
719 let _ = self.client.batch_execute("ROLLBACK").await;
720 return Err(StorageError::Postgres);
721 }
722 if self
723 .client
724 .execute(
725 "INSERT INTO device_rotation_audit (rotation_id, device_id, old_public_key, new_public_key, signature, nonce, proof_expires_at, applied_at)
726 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
727 &[
728 &rotation.rotation_id,
729 &rotation.device_id,
730 &rotation.old_public_key,
731 &rotation.new_public_key,
732 &rotation.signature,
733 &rotation.nonce,
734 &rotation.proof_expires_at,
735 &rotation.applied_at,
736 ],
737 )
738 .await
739 .is_err()
740 {
741 let _ = self.client.batch_execute("ROLLBACK").await;
742 return Err(StorageError::Postgres);
743 }
744 if self.client.batch_execute("COMMIT").await.is_err() {
745 let _ = self.client.batch_execute("ROLLBACK").await;
746 return Err(StorageError::Postgres);
747 }
748 Ok(())
749 }
750
751 pub async fn claim_pairing_token(
753 &self,
754 pair_code: &str,
755 device_id: &str,
756 public_key: &[u8],
757 ) -> Result<PairingClaimResult, StorageError> {
758 let recorded_at = Utc::now();
759 let event_id = format!(
760 "pair:{}:{}",
761 device_id,
762 recorded_at.timestamp_nanos_opt().unwrap_or_default()
763 );
764 let stmt = "WITH selected AS (
765 SELECT user_id, issuer_device_id, expires_at, redeemed_at, attempts
766 FROM device_pairing
767 WHERE pair_code = $1
768 FOR UPDATE
769 ),
770 validated AS (
771 SELECT user_id, issuer_device_id
772 FROM selected
773 WHERE expires_at > now()
774 AND redeemed_at IS NULL
775 AND attempts < $6
776 ),
777 updated AS (
778 UPDATE device_pairing
779 SET redeemed_at = $5,
780 redeemed_device_id = $2,
781 public_key = $3,
782 attempts = LEAST(attempts + 1, $6)
783 WHERE pair_code = $1
784 AND EXISTS (SELECT 1 FROM validated)
785 RETURNING user_id, issuer_device_id
786 ),
787 inserted AS (
788 INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at)
789 SELECT $2, user_id, $3, 'active', $5 FROM validated
790 RETURNING user_id
791 ),
792 events AS (
793 INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at)
794 SELECT $4, $2, $3, $5 FROM inserted
795 )
796 SELECT user_id, issuer_device_id FROM updated";
797 let result = self
798 .client
799 .query_opt(
800 stmt,
801 &[
802 &pair_code,
803 &device_id,
804 &public_key,
805 &event_id,
806 &recorded_at,
807 &PAIRING_MAX_ATTEMPTS,
808 ],
809 )
810 .await
811 .map_err(|_| StorageError::Postgres)?;
812 let (user_id, issuer_device_id) = match result {
813 Some(row) => {
814 let user_id: String = row.get(0);
815 let issuer_device_id: String = row.get(1);
816 (user_id, issuer_device_id)
817 }
818 None => {
819 let exists = self
820 .client
821 .query_opt(
822 "SELECT 1 FROM device_pairing WHERE pair_code = $1",
823 &[&pair_code],
824 )
825 .await
826 .map_err(|_| StorageError::Postgres)?;
827 if exists.is_some() {
828 self
829 .client
830 .execute(
831 "UPDATE device_pairing SET attempts = LEAST(attempts + 1, $2) WHERE pair_code = $1",
832 &[&pair_code, &PAIRING_MAX_ATTEMPTS],
833 )
834 .await
835 .map_err(|_| StorageError::Postgres)?;
836 return Err(StorageError::Invalid);
837 }
838 return Err(StorageError::Missing);
839 }
840 };
841 let profile = self.load_user(&user_id).await?;
842 Ok(PairingClaimResult {
843 user: profile,
844 issuer_device_id,
845 })
846 }
847
848 pub async fn invalidate_expired_pairings(&self) -> Result<u64, StorageError> {
850 let affected = self
851 .client
852 .execute(
853 "DELETE FROM device_pairing WHERE expires_at <= now() OR attempts >= $1 OR (redeemed_at IS NOT NULL AND redeemed_at <= now() - interval '1 day')",
854 &[&PAIRING_MAX_ATTEMPTS],
855 )
856 .await
857 .map_err(|_| StorageError::Postgres)?;
858 Ok(affected)
859 }
860
861 pub async fn latest_device_key_event(
863 &self,
864 device_id: &str,
865 ) -> Result<Option<DeviceKeyEvent>, StorageError> {
866 let query = "SELECT event_id, device_id, public_key, recorded_at
867 FROM device_key_event WHERE device_id = $1 ORDER BY recorded_at DESC LIMIT 1";
868 let row = self
869 .client
870 .query_opt(query, &[&device_id])
871 .await
872 .map_err(|_| StorageError::Postgres)?;
873 Ok(row.map(|row| DeviceKeyEvent {
874 event_id: row.get(0),
875 device_id: row.get(1),
876 public_key: row.get(2),
877 recorded_at: row.get(3),
878 }))
879 }
880
881 pub async fn record_session(&self, session: &SessionRecord) -> Result<(), StorageError> {
883 let query =
884 "INSERT INTO session (opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds)
885 VALUES ($1, $2, $3, $4, $5, $6)";
886 self.client
887 .execute(
888 query,
889 &[
890 &session.session_id,
891 &session.user_id,
892 &session.device_id,
893 &session.tls_fingerprint,
894 &session.created_at,
895 &session.ttl_seconds,
896 ],
897 )
898 .await
899 .map_err(|_| StorageError::Postgres)?;
900 Ok(())
901 }
902
903 pub async fn load_session(&self, session_id: &str) -> Result<SessionRecord, StorageError> {
905 let row = self
906 .client
907 .query_opt(
908 "SELECT opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds FROM session WHERE opaque_id = $1",
909 &[&session_id],
910 )
911 .await
912 .map_err(|_| StorageError::Postgres)?;
913 let row = row.ok_or(StorageError::Missing)?;
914 Ok(SessionRecord {
915 session_id: row.get(0),
916 user_id: row.get(1),
917 device_id: row.get(2),
918 tls_fingerprint: row.get(3),
919 created_at: row.get(4),
920 ttl_seconds: row.get(5),
921 })
922 }
923
924 pub async fn load_device(&self, device_id: &str) -> Result<DeviceRecord, StorageError> {
926 let row = self
927 .client
928 .query_opt(
929 "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE opaque_id = $1",
930 &[&device_id],
931 )
932 .await
933 .map_err(|_| StorageError::Postgres)?;
934 let row = row.ok_or(StorageError::Missing)?;
935 Ok(DeviceRecord {
936 device_id: row.get(0),
937 user_id: row.get(1),
938 public_key: row.get(2),
939 status: row.get(3),
940 created_at: row.get(4),
941 })
942 }
943
944 pub async fn load_device_pq_keys(
946 &self,
947 device_id: &str,
948 ) -> Result<Option<DevicePqKeys>, StorageError> {
949 let row = self
950 .client
951 .query_opt(
952 "SELECT device_id, kem_public, signature_public, updated_at FROM device_pq_keys WHERE device_id = $1",
953 &[&device_id],
954 )
955 .await
956 .map_err(|_| StorageError::Postgres)?;
957 Ok(row.map(|row| DevicePqKeys {
958 device_id: row.get(0),
959 kem_public: row.get(1),
960 signature_public: row.get(2),
961 updated_at: row.get(3),
962 }))
963 }
964
965 pub async fn upsert_device_pq_keys(&self, keys: &DevicePqKeys) -> Result<(), StorageError> {
967 self.client
968 .execute(
969 "INSERT INTO device_pq_keys (device_id, kem_public, signature_public, updated_at)
970 VALUES ($1, $2, $3, $4)
971 ON CONFLICT (device_id) DO UPDATE
972 SET kem_public = EXCLUDED.kem_public,
973 signature_public = EXCLUDED.signature_public,
974 updated_at = EXCLUDED.updated_at",
975 &[
976 &keys.device_id,
977 &keys.kem_public,
978 &keys.signature_public,
979 &keys.updated_at,
980 ],
981 )
982 .await
983 .map_err(|_| StorageError::Postgres)?;
984 Ok(())
985 }
986
987 pub async fn count_active_devices(&self, user_id: &str) -> Result<i64, StorageError> {
989 let row = self
990 .client
991 .query_one(
992 "SELECT COUNT(*) FROM user_device WHERE user_id = $1 AND status = 'active'",
993 &[&user_id],
994 )
995 .await
996 .map_err(|_| StorageError::Postgres)?;
997 Ok(row.get(0))
998 }
999
1000 pub async fn list_devices_for_user(
1002 &self,
1003 user_id: &str,
1004 ) -> Result<Vec<DeviceRecord>, StorageError> {
1005 let rows = self
1006 .client
1007 .query(
1008 "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE user_id = $1 ORDER BY created_at ASC",
1009 &[&user_id],
1010 )
1011 .await
1012 .map_err(|_| StorageError::Postgres)?;
1013 Ok(rows
1014 .into_iter()
1015 .map(|row| DeviceRecord {
1016 device_id: row.get(0),
1017 user_id: row.get(1),
1018 public_key: row.get(2),
1019 status: row.get(3),
1020 created_at: row.get(4),
1021 })
1022 .collect())
1023 }
1024
1025 pub async fn activate_device(&self, device_id: &str) -> Result<(), StorageError> {
1027 self.update_device_status(device_id, "active").await
1028 }
1029
1030 pub async fn deactivate_device(&self, device_id: &str) -> Result<(), StorageError> {
1032 self.update_device_status(device_id, "revoked").await
1033 }
1034
1035 async fn update_device_status(
1036 &self,
1037 device_id: &str,
1038 status: &str,
1039 ) -> Result<(), StorageError> {
1040 let affected = self
1041 .client
1042 .execute(
1043 "UPDATE user_device SET status = $2 WHERE opaque_id = $1",
1044 &[&device_id, &status],
1045 )
1046 .await
1047 .map_err(|_| StorageError::Postgres)?;
1048 if affected == 0 {
1049 return Err(StorageError::Missing);
1050 }
1051 Ok(())
1052 }
1053
1054 pub async fn create_user(&self, profile: &NewUserProfile) -> Result<UserProfile, StorageError> {
1056 let now = Utc::now();
1057 let row = self
1058 .client
1059 .query_one(
1060 "INSERT INTO app_user (user_id, handle, domain, display_name, avatar_url, created_at, updated_at)
1061 VALUES ($1, $2, $3, $4, $5, $6, $6)
1062 RETURNING user_id, handle, domain, display_name, avatar_url, created_at, updated_at",
1063 &[
1064 &profile.user_id,
1065 &profile.handle,
1066 &profile.domain,
1067 &profile.display_name,
1068 &profile.avatar_url,
1069 &now,
1070 ],
1071 )
1072 .await
1073 .map_err(|_| StorageError::Postgres)?;
1074 Ok(UserProfile {
1075 user_id: row.get(0),
1076 handle: row.get(1),
1077 domain: row.get(2),
1078 display_name: row.get(3),
1079 avatar_url: row.get(4),
1080 created_at: row.get(5),
1081 updated_at: row.get(6),
1082 })
1083 }
1084
1085 pub async fn load_user(&self, user_id: &str) -> Result<UserProfile, StorageError> {
1087 let row = self
1088 .client
1089 .query_opt(
1090 "SELECT user_id, handle, domain, display_name, avatar_url, created_at, updated_at FROM app_user WHERE user_id = $1",
1091 &[&user_id],
1092 )
1093 .await
1094 .map_err(|_| StorageError::Postgres)?;
1095 let row = row.ok_or(StorageError::Missing)?;
1096 Ok(UserProfile {
1097 user_id: row.get(0),
1098 handle: row.get(1),
1099 domain: row.get(2),
1100 display_name: row.get(3),
1101 avatar_url: row.get(4),
1102 created_at: row.get(5),
1103 updated_at: row.get(6),
1104 })
1105 }
1106
1107 pub async fn load_user_by_handle(&self, handle: &str) -> Result<UserProfile, StorageError> {
1109 let row = self
1110 .client
1111 .query_opt(
1112 "SELECT user_id, handle, domain, display_name, avatar_url, created_at, updated_at FROM app_user WHERE handle = $1",
1113 &[&handle],
1114 )
1115 .await
1116 .map_err(|_| StorageError::Postgres)?;
1117 let row = row.ok_or(StorageError::Missing)?;
1118 Ok(UserProfile {
1119 user_id: row.get(0),
1120 handle: row.get(1),
1121 domain: row.get(2),
1122 display_name: row.get(3),
1123 avatar_url: row.get(4),
1124 created_at: row.get(5),
1125 updated_at: row.get(6),
1126 })
1127 }
1128
1129 pub async fn update_user_profile(
1131 &self,
1132 user_id: &str,
1133 display_name: Option<&str>,
1134 avatar_url: Option<&str>,
1135 ) -> Result<(), StorageError> {
1136 let now = Utc::now();
1137 let affected = self
1138 .client
1139 .execute(
1140 "UPDATE app_user SET display_name = COALESCE($2, display_name), avatar_url = COALESCE($3, avatar_url), updated_at = $4 WHERE user_id = $1",
1141 &[&user_id, &display_name, &avatar_url, &now],
1142 )
1143 .await
1144 .map_err(|_| StorageError::Postgres)?;
1145 if affected == 0 {
1146 return Err(StorageError::Missing);
1147 }
1148 Ok(())
1149 }
1150
1151 pub async fn update_user_avatar(
1153 &self,
1154 user_id: &str,
1155 avatar_url: &str,
1156 ) -> Result<(), StorageError> {
1157 let now = Utc::now();
1158 let affected = self
1159 .client
1160 .execute(
1161 "UPDATE app_user SET avatar_url = $2, updated_at = $3 WHERE user_id = $1",
1162 &[&user_id, &avatar_url, &now],
1163 )
1164 .await
1165 .map_err(|_| StorageError::Postgres)?;
1166 if affected == 0 {
1167 return Err(StorageError::Missing);
1168 }
1169 Ok(())
1170 }
1171
1172 pub async fn load_user_by_federated_id(
1174 &self,
1175 handle: &str,
1176 domain: &str,
1177 ) -> Result<UserProfile, StorageError> {
1178 let row = self
1179 .client
1180 .query_opt(
1181 "SELECT user_id, handle, domain, display_name, avatar_url, created_at, updated_at
1182 FROM app_user WHERE handle = $1 AND domain = $2",
1183 &[&handle, &domain],
1184 )
1185 .await
1186 .map_err(|_| StorageError::Postgres)?;
1187 let row = row.ok_or(StorageError::Missing)?;
1188 Ok(UserProfile {
1189 user_id: row.get(0),
1190 handle: row.get(1),
1191 domain: row.get(2),
1192 display_name: row.get(3),
1193 avatar_url: row.get(4),
1194 created_at: row.get(5),
1195 updated_at: row.get(6),
1196 })
1197 }
1198
1199 pub async fn cache_remote_user(&self, profile: &RemoteUserProfile) -> Result<(), StorageError> {
1201 self.client
1202 .execute(
1203 "INSERT INTO remote_user_cache
1204 (user_id, domain, handle, display_name, avatar_url, profile_data, cached_at, expires_at, last_fetched_at)
1205 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1206 ON CONFLICT (user_id) DO UPDATE SET
1207 display_name = EXCLUDED.display_name,
1208 avatar_url = EXCLUDED.avatar_url,
1209 profile_data = EXCLUDED.profile_data,
1210 last_fetched_at = EXCLUDED.last_fetched_at,
1211 expires_at = EXCLUDED.expires_at",
1212 &[
1213 &profile.user_id,
1214 &profile.domain,
1215 &profile.handle,
1216 &profile.display_name,
1217 &profile.avatar_url,
1218 &profile.profile_data,
1219 &profile.cached_at,
1220 &profile.expires_at,
1221 &profile.last_fetched_at,
1222 ],
1223 )
1224 .await
1225 .map_err(|_| StorageError::Postgres)?;
1226 Ok(())
1227 }
1228
1229 pub async fn load_remote_user_cache(
1231 &self,
1232 user_id: &str,
1233 ) -> Result<Option<RemoteUserProfile>, StorageError> {
1234 let row = self
1235 .client
1236 .query_opt(
1237 "SELECT user_id, domain, handle, display_name, avatar_url, profile_data,
1238 cached_at, expires_at, last_fetched_at
1239 FROM remote_user_cache WHERE user_id = $1 AND expires_at > now()",
1240 &[&user_id],
1241 )
1242 .await
1243 .map_err(|_| StorageError::Postgres)?;
1244
1245 Ok(row.map(|row| RemoteUserProfile {
1246 user_id: row.get(0),
1247 domain: row.get(1),
1248 handle: row.get(2),
1249 display_name: row.get(3),
1250 avatar_url: row.get(4),
1251 profile_data: row.get(5),
1252 cached_at: row.get(6),
1253 expires_at: row.get(7),
1254 last_fetched_at: row.get(8),
1255 }))
1256 }
1257
1258 pub async fn update_federation_peer_status(
1260 &self,
1261 status: &FederationPeerConnectionStatus,
1262 ) -> Result<(), StorageError> {
1263 self.client
1264 .execute(
1265 "INSERT INTO federation_peer_status
1266 (domain, endpoint, public_key, last_seen_at, last_error, error_count, status, created_at, updated_at)
1267 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1268 ON CONFLICT (domain) DO UPDATE SET
1269 endpoint = EXCLUDED.endpoint,
1270 public_key = EXCLUDED.public_key,
1271 last_seen_at = EXCLUDED.last_seen_at,
1272 last_error = EXCLUDED.last_error,
1273 error_count = EXCLUDED.error_count,
1274 status = EXCLUDED.status,
1275 updated_at = EXCLUDED.updated_at",
1276 &[
1277 &status.domain,
1278 &status.endpoint,
1279 &status.public_key,
1280 &status.last_seen_at,
1281 &status.last_error,
1282 &status.error_count,
1283 &status.status,
1284 &status.created_at,
1285 &status.updated_at,
1286 ],
1287 )
1288 .await
1289 .map_err(|_| StorageError::Postgres)?;
1290 Ok(())
1291 }
1292
1293 pub async fn create_federated_friend_request(
1295 &self,
1296 request: &FederatedFriendRequest,
1297 ) -> Result<(), StorageError> {
1298 self.client
1299 .execute(
1300 "INSERT INTO federated_friend_requests
1301 (request_id, from_user_id, to_user_id, from_domain, to_domain, message, status, federation_event_id, created_at, updated_at)
1302 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1303 &[
1304 &request.request_id,
1305 &request.from_user_id,
1306 &request.to_user_id,
1307 &request.from_domain,
1308 &request.to_domain,
1309 &request.message,
1310 &request.status,
1311 &request.federation_event_id,
1312 &request.created_at,
1313 &request.updated_at,
1314 ],
1315 )
1316 .await
1317 .map_err(|_| StorageError::Postgres)?;
1318 Ok(())
1319 }
1320
1321 pub async fn update_federated_friend_request_status(
1323 &self,
1324 request_id: &str,
1325 status: &str,
1326 ) -> Result<(), StorageError> {
1327 let now = Utc::now();
1328 let affected = self
1329 .client
1330 .execute(
1331 "UPDATE federated_friend_requests SET status = $2, updated_at = $3 WHERE request_id = $1",
1332 &[&request_id, &status, &now],
1333 )
1334 .await
1335 .map_err(|_| StorageError::Postgres)?;
1336 if affected == 0 {
1337 return Err(StorageError::Missing);
1338 }
1339 Ok(())
1340 }
1341
1342 pub async fn list_federated_friend_requests(
1344 &self,
1345 user_id: &str,
1346 incoming: bool,
1347 ) -> Result<Vec<FederatedFriendRequest>, StorageError> {
1348 let query = if incoming {
1349 "SELECT request_id, from_user_id, to_user_id, from_domain, to_domain, message, status, federation_event_id, created_at, updated_at
1350 FROM federated_friend_requests WHERE to_user_id = $1 ORDER BY created_at DESC"
1351 } else {
1352 "SELECT request_id, from_user_id, to_user_id, from_domain, to_domain, message, status, federation_event_id, created_at, updated_at
1353 FROM federated_friend_requests WHERE from_user_id = $1 ORDER BY created_at DESC"
1354 };
1355
1356 let rows = self
1357 .client
1358 .query(query, &[&user_id])
1359 .await
1360 .map_err(|_| StorageError::Postgres)?;
1361
1362 Ok(rows
1363 .into_iter()
1364 .map(|row| FederatedFriendRequest {
1365 request_id: row.get(0),
1366 from_user_id: row.get(1),
1367 to_user_id: row.get(2),
1368 from_domain: row.get(3),
1369 to_domain: row.get(4),
1370 message: row.get(5),
1371 status: row.get(6),
1372 federation_event_id: row.get(7),
1373 created_at: row.get(8),
1374 updated_at: row.get(9),
1375 })
1376 .collect())
1377 }
1378
1379 pub async fn create_group(&self, group: &ChatGroup) -> Result<(), StorageError> {
1381 self.client
1382 .execute(
1383 "INSERT INTO chat_group (group_id, owner_device, created_at) VALUES ($1, $2, $3)
1384 ON CONFLICT (group_id) DO NOTHING",
1385 &[&group.group_id, &group.owner_device, &group.created_at],
1386 )
1387 .await
1388 .map_err(|_| StorageError::Postgres)?;
1389 self.client
1390 .execute(
1391 "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
1392 ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role",
1393 &[
1394 &group.group_id,
1395 &group.owner_device,
1396 &GroupRole::Owner.as_str(),
1397 &group.created_at,
1398 ],
1399 )
1400 .await
1401 .map_err(|_| StorageError::Postgres)?;
1402 Ok(())
1403 }
1404
1405 pub async fn add_group_member(&self, member: &GroupMember) -> Result<(), StorageError> {
1407 let query = "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
1408 ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role, joined_at = excluded.joined_at";
1409 self.client
1410 .execute(
1411 query,
1412 &[
1413 &member.group_id,
1414 &member.device_id,
1415 &member.role.as_str(),
1416 &member.joined_at,
1417 ],
1418 )
1419 .await
1420 .map_err(|_| StorageError::Postgres)?;
1421 Ok(())
1422 }
1423
1424 pub async fn remove_group_member(
1426 &self,
1427 group_id: &str,
1428 device_id: &str,
1429 ) -> Result<(), StorageError> {
1430 let affected = self
1431 .client
1432 .execute(
1433 "DELETE FROM group_member WHERE group_id = $1 AND device_id = $2",
1434 &[&group_id, &device_id],
1435 )
1436 .await
1437 .map_err(|_| StorageError::Postgres)?;
1438 if affected == 0 {
1439 return Err(StorageError::Missing);
1440 }
1441 Ok(())
1442 }
1443
1444 pub async fn list_group_members(
1446 &self,
1447 group_id: &str,
1448 ) -> Result<Vec<GroupMember>, StorageError> {
1449 let rows = self
1450 .client
1451 .query(
1452 "SELECT group_id, device_id, role, joined_at FROM group_member WHERE group_id = $1 ORDER BY joined_at ASC",
1453 &[&group_id],
1454 )
1455 .await
1456 .map_err(|_| StorageError::Postgres)?;
1457 let mut members = Vec::with_capacity(rows.len());
1458 for row in rows {
1459 let role: String = row.get(2);
1460 let parsed = GroupRole::from_str(role.as_str())?;
1461 members.push(GroupMember {
1462 group_id: row.get(0),
1463 device_id: row.get(1),
1464 role: parsed,
1465 joined_at: row.get(3),
1466 });
1467 }
1468 Ok(members)
1469 }
1470
1471 pub async fn load_group(&self, group_id: &str) -> Result<ChatGroup, StorageError> {
1473 let row = self
1474 .client
1475 .query_opt(
1476 "SELECT group_id, owner_device, created_at FROM chat_group WHERE group_id = $1",
1477 &[&group_id],
1478 )
1479 .await
1480 .map_err(|_| StorageError::Postgres)?;
1481 let row = row.ok_or(StorageError::Missing)?;
1482 Ok(ChatGroup {
1483 group_id: row.get(0),
1484 owner_device: row.get(1),
1485 created_at: row.get(2),
1486 })
1487 }
1488
1489 pub async fn list_groups_for_device(
1491 &self,
1492 device_id: &str,
1493 ) -> Result<Vec<ChatGroup>, StorageError> {
1494 let rows = self
1495 .client
1496 .query(
1497 "SELECT g.group_id, g.owner_device, g.created_at FROM chat_group g
1498 INNER JOIN group_member m ON g.group_id = m.group_id
1499 WHERE m.device_id = $1 ORDER BY g.created_at ASC",
1500 &[&device_id],
1501 )
1502 .await
1503 .map_err(|_| StorageError::Postgres)?;
1504 Ok(rows
1505 .into_iter()
1506 .map(|row| ChatGroup {
1507 group_id: row.get(0),
1508 owner_device: row.get(1),
1509 created_at: row.get(2),
1510 })
1511 .collect())
1512 }
1513
1514 pub async fn upsert_federation_peer(
1516 &self,
1517 peer: &FederationPeerRecord,
1518 ) -> Result<(), StorageError> {
1519 let query = "INSERT INTO federation_peer (domain, endpoint, public_key, status, updated_at)
1520 VALUES ($1, $2, $3, $4, $5)
1521 ON CONFLICT (domain) DO UPDATE SET endpoint = excluded.endpoint, public_key = excluded.public_key, status = excluded.status, updated_at = excluded.updated_at";
1522 self.client
1523 .execute(
1524 query,
1525 &[
1526 &peer.domain,
1527 &peer.endpoint,
1528 &peer.public_key.as_slice(),
1529 &peer.status.as_str(),
1530 &peer.updated_at,
1531 ],
1532 )
1533 .await
1534 .map_err(|_| StorageError::Postgres)?;
1535 Ok(())
1536 }
1537
1538 pub async fn load_federation_peer(
1540 &self,
1541 domain: &str,
1542 ) -> Result<FederationPeerRecord, StorageError> {
1543 let row = self
1544 .client
1545 .query_opt(
1546 "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer WHERE domain = $1",
1547 &[&domain],
1548 )
1549 .await
1550 .map_err(|_| StorageError::Postgres)?;
1551 let row = row.ok_or(StorageError::Missing)?;
1552 let key: Vec<u8> = row.get(2);
1553 let status: String = row.get(3);
1554 let status = FederationPeerStatus::from_str(status.as_str())?;
1555 let public_key: [u8; 32] = key
1556 .as_slice()
1557 .try_into()
1558 .map_err(|_| StorageError::Serialization)?;
1559 Ok(FederationPeerRecord {
1560 domain: row.get(0),
1561 endpoint: row.get(1),
1562 public_key,
1563 status,
1564 updated_at: row.get(4),
1565 })
1566 }
1567
1568 pub async fn list_federation_peers(&self) -> Result<Vec<FederationPeerRecord>, StorageError> {
1570 let rows = self
1571 .client
1572 .query(
1573 "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer",
1574 &[],
1575 )
1576 .await
1577 .map_err(|_| StorageError::Postgres)?;
1578 let mut peers = Vec::with_capacity(rows.len());
1579 for row in rows {
1580 let key: Vec<u8> = row.get(2);
1581 let status: String = row.get(3);
1582 let status = FederationPeerStatus::from_str(status.as_str())?;
1583 let public_key: [u8; 32] = key
1584 .as_slice()
1585 .try_into()
1586 .map_err(|_| StorageError::Serialization)?;
1587 peers.push(FederationPeerRecord {
1588 domain: row.get(0),
1589 endpoint: row.get(1),
1590 public_key,
1591 status,
1592 updated_at: row.get(4),
1593 });
1594 }
1595 Ok(peers)
1596 }
1597
1598 pub async fn set_federation_peer_status(
1600 &self,
1601 domain: &str,
1602 status: FederationPeerStatus,
1603 ) -> Result<(), StorageError> {
1604 let now = Utc::now();
1605 let affected = self
1606 .client
1607 .execute(
1608 "UPDATE federation_peer SET status = $2, updated_at = $3 WHERE domain = $1",
1609 &[&domain, &status.as_str(), &now],
1610 )
1611 .await
1612 .map_err(|_| StorageError::Postgres)?;
1613 if affected == 0 {
1614 return Err(StorageError::Missing);
1615 }
1616 Ok(())
1617 }
1618
1619 pub async fn enqueue_relay(&self, envelope: &RelayEnvelope) -> Result<(), StorageError> {
1621 let query =
1622 "INSERT INTO relay_queue (envelope_id, channel_id, payload, deliver_after, expires_at)
1623 VALUES ($1, $2, $3, $4, $5)";
1624 self.client
1625 .execute(
1626 query,
1627 &[
1628 &envelope.envelope_id,
1629 &envelope.channel_id,
1630 &envelope.payload,
1631 &envelope.deliver_after,
1632 &envelope.expires_at,
1633 ],
1634 )
1635 .await
1636 .map_err(|_| StorageError::Postgres)?;
1637 Ok(())
1638 }
1639
1640 pub async fn claim_envelopes(
1642 &self,
1643 channel_id: &str,
1644 limit: i64,
1645 ) -> Result<Vec<RelayEnvelope>, StorageError> {
1646 let query = "DELETE FROM relay_queue
1647 WHERE envelope_id IN (
1648 SELECT envelope_id FROM relay_queue
1649 WHERE channel_id = $1 AND deliver_after <= now()
1650 ORDER BY deliver_after ASC
1651 LIMIT $2
1652 )
1653 RETURNING envelope_id, channel_id, payload, deliver_after, expires_at";
1654 let rows = self
1655 .client
1656 .query(query, &[&channel_id, &limit])
1657 .await
1658 .map_err(|_| StorageError::Postgres)?;
1659 Ok(rows
1660 .into_iter()
1661 .map(|row| RelayEnvelope {
1662 envelope_id: row.get(0),
1663 channel_id: row.get(1),
1664 payload: row.get(2),
1665 deliver_after: row.get(3),
1666 expires_at: row.get(4),
1667 })
1668 .collect())
1669 }
1670
1671 pub async fn enqueue_federation_outbox(
1673 &self,
1674 record: &FederationOutboxInsert<'_>,
1675 ) -> Result<(), StorageError> {
1676 self.client
1677 .execute(
1678 "INSERT INTO federation_outbox (outbox_id, destination_domain, endpoint, payload, public_key, next_attempt_at)
1679 VALUES ($1, $2, $3, $4, $5, $6)
1680 ON CONFLICT (outbox_id) DO NOTHING",
1681 &[
1682 &record.outbox_id,
1683 &record.destination,
1684 &record.endpoint,
1685 &record.payload,
1686 &&record.public_key[..],
1687 &record.next_attempt_at,
1688 ],
1689 )
1690 .await
1691 .map_err(|_| StorageError::Postgres)?;
1692 Ok(())
1693 }
1694
1695 pub async fn claim_federation_outbox(
1697 &self,
1698 limit: i64,
1699 lease: Duration,
1700 now: DateTime<Utc>,
1701 ) -> Result<Vec<FederationOutboxMessage>, StorageError> {
1702 let lease_deadline = now + lease;
1703 let query = "WITH due AS (
1704 SELECT outbox_id
1705 FROM federation_outbox
1706 WHERE next_attempt_at <= $1
1707 ORDER BY created_at ASC
1708 FOR UPDATE SKIP LOCKED
1709 LIMIT $2
1710 ),
1711 updated AS (
1712 UPDATE federation_outbox f
1713 SET next_attempt_at = $3,
1714 attempts = f.attempts + 1,
1715 last_error = NULL
1716 FROM due
1717 WHERE f.outbox_id = due.outbox_id
1718 RETURNING f.outbox_id, f.destination_domain, f.endpoint, f.payload, f.public_key, f.attempts, f.next_attempt_at, f.last_error
1719 )
1720 SELECT * FROM updated";
1721 let rows = self
1722 .client
1723 .query(query, &[&now, &limit, &lease_deadline])
1724 .await
1725 .map_err(|_| StorageError::Postgres)?;
1726 rows.into_iter()
1727 .map(|row| {
1728 let key: Vec<u8> = row.get(4);
1729 let public_key: [u8; 32] = key
1730 .as_slice()
1731 .try_into()
1732 .map_err(|_| StorageError::Serialization)?;
1733 Ok(FederationOutboxMessage {
1734 outbox_id: row.get(0),
1735 destination: row.get(1),
1736 endpoint: row.get(2),
1737 payload: row.get(3),
1738 public_key,
1739 attempts: row.get(5),
1740 next_attempt_at: row.get(6),
1741 last_error: row.get(7),
1742 })
1743 })
1744 .collect()
1745 }
1746
1747 pub async fn delete_federation_outbox(&self, outbox_id: &str) -> Result<(), StorageError> {
1749 let affected = self
1750 .client
1751 .execute(
1752 "DELETE FROM federation_outbox WHERE outbox_id = $1",
1753 &[&outbox_id],
1754 )
1755 .await
1756 .map_err(|_| StorageError::Postgres)?;
1757 if affected == 0 {
1758 return Err(StorageError::Missing);
1759 }
1760 Ok(())
1761 }
1762
1763 pub async fn reschedule_federation_outbox(
1765 &self,
1766 outbox_id: &str,
1767 delay: Duration,
1768 now: DateTime<Utc>,
1769 error: Option<&str>,
1770 ) -> Result<(), StorageError> {
1771 let next_attempt = now + delay;
1772 let affected = self
1773 .client
1774 .execute(
1775 "UPDATE federation_outbox SET next_attempt_at = $2, last_error = $3 WHERE outbox_id = $1",
1776 &[&outbox_id, &next_attempt, &error],
1777 )
1778 .await
1779 .map_err(|_| StorageError::Postgres)?;
1780 if affected == 0 {
1781 return Err(StorageError::Missing);
1782 }
1783 Ok(())
1784 }
1785
1786 pub async fn store_inbox_offset(&self, offset: &InboxOffset) -> Result<(), StorageError> {
1788 let query = "INSERT INTO inbox_offset (entity_id, channel_id, last_envelope_id, updated_at)
1789 VALUES ($1, $2, $3, $4)
1790 ON CONFLICT (entity_id, channel_id) DO UPDATE SET last_envelope_id = excluded.last_envelope_id, updated_at = excluded.updated_at";
1791 self.client
1792 .execute(
1793 query,
1794 &[
1795 &offset.entity_id,
1796 &offset.channel_id,
1797 &offset.last_envelope_id,
1798 &offset.updated_at,
1799 ],
1800 )
1801 .await
1802 .map_err(|_| StorageError::Postgres)?;
1803 Ok(())
1804 }
1805
1806 pub async fn read_inbox_offset(
1808 &self,
1809 entity_id: &str,
1810 channel_id: &str,
1811 ) -> Result<Option<InboxOffset>, StorageError> {
1812 let row = self
1813 .client
1814 .query_opt(
1815 "SELECT entity_id, channel_id, last_envelope_id, updated_at FROM inbox_offset WHERE entity_id = $1 AND channel_id = $2",
1816 &[&entity_id, &channel_id],
1817 )
1818 .await
1819 .map_err(|_| StorageError::Postgres)?;
1820 Ok(row.map(|row| InboxOffset {
1821 entity_id: row.get(0),
1822 channel_id: row.get(1),
1823 last_envelope_id: row.get(2),
1824 updated_at: row.get(3),
1825 }))
1826 }
1827
1828 pub async fn store_idempotency(&self, key: &IdempotencyKey) -> Result<bool, StorageError> {
1830 let query = "INSERT INTO idempotency (key, scope, created_at) VALUES ($1, $2, $3)
1831 ON CONFLICT (key, scope) DO NOTHING RETURNING 1";
1832 let row = self
1833 .client
1834 .query_opt(query, &[&key.key, &key.scope, &key.created_at])
1835 .await
1836 .map_err(|_| StorageError::Postgres)?;
1837 Ok(row.is_some())
1838 }
1839
1840 pub async fn create_friend_request(
1842 &self,
1843 id: &str,
1844 from_user_id: &str,
1845 to_user_id: &str,
1846 message: Option<&str>,
1847 ) -> Result<FriendRequest, StorageError> {
1848 let query = "INSERT INTO friend_requests (id, from_user_id, to_user_id, message, created_at, updated_at)
1849 VALUES ($1, $2, $3, $4, NOW(), NOW())
1850 ON CONFLICT (from_user_id, to_user_id) DO UPDATE
1851 SET status = 'pending', message = EXCLUDED.message, updated_at = NOW()
1852 RETURNING id, from_user_id, to_user_id, status, message, created_at, updated_at";
1853 let row = self
1854 .client
1855 .query_one(query, &[&id, &from_user_id, &to_user_id, &message])
1856 .await
1857 .map_err(|_| StorageError::Postgres)?;
1858 Ok(FriendRequest {
1859 id: row.get(0),
1860 from_user_id: row.get(1),
1861 to_user_id: row.get(2),
1862 status: row.get(3),
1863 message: row.get(4),
1864 created_at: row.get(5),
1865 updated_at: row.get(6),
1866 })
1867 }
1868
1869 pub async fn get_friend_request(&self, id: &str) -> Result<FriendRequest, StorageError> {
1871 let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
1872 FROM friend_requests WHERE id = $1";
1873 let row = self
1874 .client
1875 .query_opt(query, &[&id])
1876 .await
1877 .map_err(|_| StorageError::Postgres)?
1878 .ok_or(StorageError::Missing)?;
1879 Ok(FriendRequest {
1880 id: row.get(0),
1881 from_user_id: row.get(1),
1882 to_user_id: row.get(2),
1883 status: row.get(3),
1884 message: row.get(4),
1885 created_at: row.get(5),
1886 updated_at: row.get(6),
1887 })
1888 }
1889
1890 pub async fn list_incoming_friend_requests(
1892 &self,
1893 user_id: &str,
1894 ) -> Result<Vec<FriendRequest>, StorageError> {
1895 let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
1896 FROM friend_requests
1897 WHERE to_user_id = $1 AND status = 'pending'
1898 ORDER BY created_at DESC";
1899 let rows = self
1900 .client
1901 .query(query, &[&user_id])
1902 .await
1903 .map_err(|_| StorageError::Postgres)?;
1904 Ok(rows
1905 .iter()
1906 .map(|row| FriendRequest {
1907 id: row.get(0),
1908 from_user_id: row.get(1),
1909 to_user_id: row.get(2),
1910 status: row.get(3),
1911 message: row.get(4),
1912 created_at: row.get(5),
1913 updated_at: row.get(6),
1914 })
1915 .collect())
1916 }
1917
1918 pub async fn list_outgoing_friend_requests(
1920 &self,
1921 user_id: &str,
1922 ) -> Result<Vec<FriendRequest>, StorageError> {
1923 let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
1924 FROM friend_requests
1925 WHERE from_user_id = $1
1926 ORDER BY created_at DESC";
1927 let rows = self
1928 .client
1929 .query(query, &[&user_id])
1930 .await
1931 .map_err(|_| StorageError::Postgres)?;
1932 Ok(rows
1933 .iter()
1934 .map(|row| FriendRequest {
1935 id: row.get(0),
1936 from_user_id: row.get(1),
1937 to_user_id: row.get(2),
1938 status: row.get(3),
1939 message: row.get(4),
1940 created_at: row.get(5),
1941 updated_at: row.get(6),
1942 })
1943 .collect())
1944 }
1945
1946 pub async fn accept_friend_request(&self, id: &str) -> Result<FriendRequest, StorageError> {
1948 let query = "UPDATE friend_requests
1949 SET status = 'accepted', updated_at = NOW()
1950 WHERE id = $1 AND status = 'pending'
1951 RETURNING id, from_user_id, to_user_id, status, message, created_at, updated_at";
1952 let row = self
1953 .client
1954 .query_opt(query, &[&id])
1955 .await
1956 .map_err(|_| StorageError::Postgres)?
1957 .ok_or(StorageError::Missing)?;
1958 Ok(FriendRequest {
1959 id: row.get(0),
1960 from_user_id: row.get(1),
1961 to_user_id: row.get(2),
1962 status: row.get(3),
1963 message: row.get(4),
1964 created_at: row.get(5),
1965 updated_at: row.get(6),
1966 })
1967 }
1968
1969 pub async fn reject_friend_request(&self, id: &str) -> Result<FriendRequest, StorageError> {
1971 let query = "UPDATE friend_requests
1972 SET status = 'rejected', updated_at = NOW()
1973 WHERE id = $1 AND status = 'pending'
1974 RETURNING id, from_user_id, to_user_id, status, message, created_at, updated_at";
1975 let row = self
1976 .client
1977 .query_opt(query, &[&id])
1978 .await
1979 .map_err(|_| StorageError::Postgres)?
1980 .ok_or(StorageError::Missing)?;
1981 Ok(FriendRequest {
1982 id: row.get(0),
1983 from_user_id: row.get(1),
1984 to_user_id: row.get(2),
1985 status: row.get(3),
1986 message: row.get(4),
1987 created_at: row.get(5),
1988 updated_at: row.get(6),
1989 })
1990 }
1991
1992 pub async fn delete_friend_request(&self, id: &str) -> Result<(), StorageError> {
1994 let query = "DELETE FROM friend_requests WHERE id = $1";
1995 self.client
1996 .execute(query, &[&id])
1997 .await
1998 .map_err(|_| StorageError::Postgres)?;
1999 Ok(())
2000 }
2001
2002 pub async fn friend_request_exists(
2004 &self,
2005 from_user_id: &str,
2006 to_user_id: &str,
2007 ) -> Result<Option<FriendRequest>, StorageError> {
2008 let query = "SELECT id, from_user_id, to_user_id, status, message, created_at, updated_at
2009 FROM friend_requests
2010 WHERE from_user_id = $1 AND to_user_id = $2";
2011 let row = self
2012 .client
2013 .query_opt(query, &[&from_user_id, &to_user_id])
2014 .await
2015 .map_err(|_| StorageError::Postgres)?;
2016 Ok(row.map(|row| FriendRequest {
2017 id: row.get(0),
2018 from_user_id: row.get(1),
2019 to_user_id: row.get(2),
2020 status: row.get(3),
2021 message: row.get(4),
2022 created_at: row.get(5),
2023 updated_at: row.get(6),
2024 }))
2025 }
2026
2027 pub async fn publish_presence(&self, snapshot: &PresenceSnapshot) -> Result<(), StorageError> {
2029 let mut conn = self.redis.lock().await;
2030 let ttl = (snapshot.expires_at.timestamp() - Utc::now().timestamp()).max(1) as usize;
2031 let payload = serde_json::json!({
2032 "entity": snapshot.entity.clone(),
2033 "state": snapshot.state.clone(),
2034 "expires_at": snapshot.expires_at.to_rfc3339(),
2035 "user": presence_user_payload(snapshot),
2036 })
2037 .to_string();
2038 redis::cmd("SETEX")
2039 .arg(format!("presence:{}", snapshot.entity))
2040 .arg(ttl)
2041 .arg(payload)
2042 .query_async::<()>(&mut *conn)
2043 .await
2044 .map_err(|_| StorageError::Redis)?;
2045 Ok(())
2046 }
2047
2048 pub async fn read_presence(
2050 &self,
2051 entity: &str,
2052 ) -> Result<Option<PresenceSnapshot>, StorageError> {
2053 let mut conn = self.redis.lock().await;
2054 let value: Option<String> = redis::cmd("GET")
2055 .arg(format!("presence:{}", entity))
2056 .query_async::<Option<String>>(&mut *conn)
2057 .await
2058 .map_err(|_| StorageError::Redis)?;
2059 if let Some(json) = value {
2060 let parsed: Value =
2061 serde_json::from_str(&json).map_err(|_| StorageError::Serialization)?;
2062 let state = parsed
2063 .get("state")
2064 .and_then(|v| v.as_str())
2065 .unwrap_or("online")
2066 .to_string();
2067 let expires = parsed
2068 .get("expires_at")
2069 .and_then(|v| v.as_str())
2070 .ok_or(StorageError::Serialization)?;
2071 let expires = DateTime::parse_from_rfc3339(expires)
2072 .map_err(|_| StorageError::Serialization)?
2073 .with_timezone(&Utc);
2074 let user_obj = parsed.get("user").and_then(|v| v.as_object());
2075 let (user_id, handle, display_name, avatar_url) = if let Some(map) = user_obj {
2076 presence_user_fields(map)
2077 } else {
2078 (None, None, None, None)
2079 };
2080 Ok(Some(PresenceSnapshot {
2081 entity: entity.to_string(),
2082 state,
2083 expires_at: expires,
2084 user_id,
2085 handle,
2086 display_name,
2087 avatar_url,
2088 }))
2089 } else {
2090 Ok(None)
2091 }
2092 }
2093
2094 pub async fn register_route(
2096 &self,
2097 entity: &str,
2098 session_id: &str,
2099 ttl_seconds: i64,
2100 ) -> Result<(), StorageError> {
2101 let mut conn = self.redis.lock().await;
2102 redis::cmd("SETEX")
2103 .arg(format!("route:{}", entity))
2104 .arg(ttl_seconds.max(1) as usize)
2105 .arg(session_id)
2106 .query_async::<()>(&mut *conn)
2107 .await
2108 .map_err(|_| StorageError::Redis)?;
2109 Ok(())
2110 }
2111
2112 pub async fn clear_route(&self, entity: &str) -> Result<(), StorageError> {
2114 let mut conn = self.redis.lock().await;
2115 let _: () = redis::cmd("DEL")
2116 .arg(format!("route:{}", entity))
2117 .query_async::<()>(&mut *conn)
2118 .await
2119 .map_err(|_| StorageError::Redis)?;
2120 Ok(())
2121 }
2122}
2123
2124fn generate_pair_code() -> String {
2125 let mut seed = [0u8; PAIRING_CODE_LENGTH];
2126 OsRng.fill_bytes(&mut seed);
2127 let mut output = String::with_capacity(PAIRING_CODE_LENGTH + 1);
2128 for (index, byte) in seed.iter().enumerate() {
2129 let symbol = PAIRING_ALPHABET[(*byte as usize) % PAIRING_ALPHABET.len()] as char;
2130 output.push(symbol);
2131 if index == (PAIRING_CODE_LENGTH / 2) - 1 {
2132 output.push('-');
2133 }
2134 }
2135 if output.ends_with('-') {
2136 output.pop();
2137 }
2138 output
2139}
2140
2141#[cfg(test)]
2142mod tests {
2143 use super::*;
2144 use chrono::{Duration, Utc};
2145 use serde_json::json;
2146 use std::str::FromStr;
2147
2148 #[test]
2149 fn init_sql_exists() {
2150 assert!(INIT_SQL.contains("CREATE TABLE"));
2151 }
2152
2153 #[test]
2154 fn pairing_code_format() {
2155 let code = generate_pair_code();
2156 assert_eq!(code.len(), PAIRING_CODE_LENGTH + 1);
2157 assert!(code.contains('-'));
2158 }
2159
2160 #[test]
2161 fn init_sql_declares_new_relations() {
2162 assert!(INIT_SQL.contains("device_key_event"));
2163 assert!(INIT_SQL.contains("chat_group"));
2164 assert!(INIT_SQL.contains("group_member"));
2165 assert!(INIT_SQL.contains("federation_peer"));
2166 assert!(INIT_SQL.contains("inbox_offset"));
2167 }
2168
2169 #[test]
2170 fn pairing_sql_declares_pairing_table() {
2171 assert!(PAIRING_SQL.contains("device_pairing"));
2172 }
2173
2174 #[test]
2175 fn group_role_roundtrip() {
2176 assert_eq!(GroupRole::Owner.as_str(), "owner");
2177 assert_eq!(GroupRole::from_str("admin").unwrap(), GroupRole::Admin);
2178 assert!(GroupRole::from_str("unknown").is_err());
2179 }
2180
2181 #[test]
2182 fn federation_status_roundtrip() {
2183 assert_eq!(FederationPeerStatus::Active.as_str(), "active");
2184 assert_eq!(
2185 FederationPeerStatus::from_str("pending").unwrap(),
2186 FederationPeerStatus::Pending
2187 );
2188 assert!(FederationPeerStatus::from_str("offline").is_err());
2189 }
2190
2191 #[test]
2192 fn presence_user_payload_emits_aliases() {
2193 let now = Utc::now();
2194 let snapshot = PresenceSnapshot {
2195 entity: "dev-1".to_string(),
2196 state: "online".to_string(),
2197 expires_at: now + Duration::seconds(30),
2198 user_id: Some("user-1".to_string()),
2199 handle: Some("alice".to_string()),
2200 display_name: Some("Alice".to_string()),
2201 avatar_url: None,
2202 };
2203 let payload = presence_user_payload(&snapshot).expect("payload");
2204 assert_eq!(payload["id"], serde_json::json!("user-1"));
2205 assert_eq!(payload["user_id"], serde_json::json!("user-1"));
2206 assert_eq!(payload["handle"], serde_json::json!("alice"));
2207 }
2208
2209 #[test]
2210 fn presence_user_fields_accepts_user_id_alias() {
2211 let mut map = serde_json::Map::new();
2212 map.insert("user_id".to_string(), serde_json::json!("user-99"));
2213 map.insert("handle".to_string(), serde_json::json!("eve"));
2214 let (user_id, handle, display_name, avatar_url) = presence_user_fields(&map);
2215 assert_eq!(user_id.as_deref(), Some("user-99"));
2216 assert_eq!(handle.as_deref(), Some("eve"));
2217 assert!(display_name.is_none());
2218 assert!(avatar_url.is_none());
2219 }
2220
2221 #[tokio::test]
2222 async fn storage_integration_flow() -> Result<(), Box<dyn std::error::Error>> {
2223 let pg = match std::env::var("COMMUCAT_TEST_PG_DSN") {
2224 Ok(value) => value,
2225 Err(_) => {
2226 eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_PG_DSN not set");
2227 return Ok(());
2228 }
2229 };
2230 let redis = match std::env::var("COMMUCAT_TEST_REDIS_URL") {
2231 Ok(value) => value,
2232 Err(_) => {
2233 eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_REDIS_URL not set");
2234 return Ok(());
2235 }
2236 };
2237 let storage = connect(&pg, &redis).await?;
2238 storage.migrate().await?;
2239 let suffix = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2240 let user_profile = NewUserProfile {
2241 user_id: format!("test-user-{}", suffix),
2242 handle: format!("tester{}", suffix),
2243 domain: "local".to_string(),
2244 display_name: Some("Tester".to_string()),
2245 avatar_url: None,
2246 };
2247 let created = storage.create_user(&user_profile).await?;
2248 let device_id = format!("test-device-{}", suffix);
2249 let device_record = DeviceRecord {
2250 device_id: device_id.clone(),
2251 user_id: created.user_id.clone(),
2252 public_key: vec![1; 32],
2253 status: "active".to_string(),
2254 created_at: Utc::now(),
2255 };
2256 storage.upsert_device(&device_record).await?;
2257 let key_event = DeviceKeyEvent {
2258 event_id: format!("evt-{}", suffix),
2259 device_id: device_id.clone(),
2260 public_key: device_record.public_key.clone(),
2261 recorded_at: Utc::now(),
2262 };
2263 storage.record_device_key_event(&key_event).await?;
2264 let latest = storage
2265 .latest_device_key_event(&device_id)
2266 .await?
2267 .expect("expected key event");
2268 assert_eq!(latest.public_key.len(), 32);
2269
2270 let new_key = vec![2u8; 32];
2271 let rotation_id = format!("rot-{}", suffix);
2272 let rotation_event_id = format!("evt-rot-{}", suffix);
2273 let signature = vec![3u8; 64];
2274 let nonce = vec![4u8; 16];
2275 let applied_at = Utc::now();
2276 let proof_expires_at = applied_at + Duration::seconds(120);
2277 let rotation_record = DeviceRotationRecord {
2278 rotation_id: &rotation_id,
2279 device_id: &device_id,
2280 user_id: &created.user_id,
2281 old_public_key: device_record.public_key.as_slice(),
2282 new_public_key: new_key.as_slice(),
2283 signature: signature.as_slice(),
2284 nonce: Some(nonce.as_slice()),
2285 proof_expires_at,
2286 applied_at,
2287 event_id: &rotation_event_id,
2288 };
2289 storage.apply_device_key_rotation(&rotation_record).await?;
2290 let rotated = storage
2291 .latest_device_key_event(&device_id)
2292 .await?
2293 .expect("expected rotation event");
2294 assert_eq!(rotated.event_id, rotation_event_id);
2295 assert_eq!(rotated.public_key, new_key);
2296 let audit_row = storage
2297 .client
2298 .query_one(
2299 "SELECT old_public_key, new_public_key, proof_expires_at FROM device_rotation_audit WHERE rotation_id = $1",
2300 &[&rotation_id],
2301 )
2302 .await?;
2303 let stored_old: Vec<u8> = audit_row.get(0);
2304 let stored_new: Vec<u8> = audit_row.get(1);
2305 let stored_expiry: DateTime<Utc> = audit_row.get(2);
2306 assert_eq!(stored_old, device_record.public_key);
2307 assert_eq!(stored_new, new_key);
2308 assert_eq!(stored_expiry, proof_expires_at);
2309
2310 let group = ChatGroup {
2311 group_id: format!("group-{}", suffix),
2312 owner_device: device_id.clone(),
2313 created_at: Utc::now(),
2314 };
2315 storage.create_group(&group).await?;
2316 let member = GroupMember {
2317 group_id: group.group_id.clone(),
2318 device_id: format!("peer-device-{}", suffix),
2319 role: GroupRole::Member,
2320 joined_at: Utc::now(),
2321 };
2322 storage.add_group_member(&member).await?;
2323 let members = storage.list_group_members(&group.group_id).await?;
2324 assert!(members.iter().any(|m| m.device_id == device_id));
2325 assert!(members.iter().any(|m| m.device_id == member.device_id));
2326 let memberships = storage.list_groups_for_device(&member.device_id).await?;
2327 assert_eq!(memberships.len(), 1);
2328 storage
2329 .remove_group_member(&group.group_id, &member.device_id)
2330 .await?;
2331
2332 let peer = FederationPeerRecord {
2333 domain: format!("peer{}.example", suffix),
2334 endpoint: "https://peer.example/federation".to_string(),
2335 public_key: [5u8; 32],
2336 status: FederationPeerStatus::Active,
2337 updated_at: Utc::now(),
2338 };
2339 storage.upsert_federation_peer(&peer).await?;
2340 let fetched = storage.load_federation_peer(&peer.domain).await?;
2341 assert_eq!(fetched.endpoint, peer.endpoint);
2342
2343 let event_json = serde_json::json!({
2344 "event": {
2345 "event_id": format!("fed-{}", suffix),
2346 "origin": "remote.example",
2347 "created_at": Utc::now().to_rfc3339(),
2348 "payload": serde_json::json!({"channel": 1, "payload": "00"}),
2349 "scope": "relay",
2350 },
2351 "signature": vec![0u8; 64],
2352 "digest": vec![0u8; 32],
2353 });
2354 let outbox_id = format!("outbox-{}", suffix);
2355 let outbox_insert = FederationOutboxInsert {
2356 outbox_id: &outbox_id,
2357 destination: &peer.domain,
2358 endpoint: &peer.endpoint,
2359 payload: &event_json,
2360 public_key: &peer.public_key,
2361 next_attempt_at: Utc::now(),
2362 };
2363 storage.enqueue_federation_outbox(&outbox_insert).await?;
2364 let claimed = storage
2365 .claim_federation_outbox(8, Duration::seconds(30), Utc::now())
2366 .await?;
2367 assert_eq!(claimed.len(), 1);
2368 assert_eq!(claimed[0].outbox_id, outbox_id);
2369 storage
2370 .reschedule_federation_outbox(
2371 &outbox_id,
2372 Duration::seconds(5),
2373 Utc::now(),
2374 Some("boom"),
2375 )
2376 .await?;
2377 let after_delay = storage
2378 .claim_federation_outbox(8, Duration::seconds(30), Utc::now() + Duration::seconds(6))
2379 .await?;
2380 assert_eq!(after_delay.len(), 1);
2381 assert_eq!(after_delay[0].outbox_id, outbox_id);
2382 storage.delete_federation_outbox(&outbox_id).await?;
2383
2384 let offset = InboxOffset {
2385 entity_id: device_id.clone(),
2386 channel_id: format!("inbox:{}", device_id),
2387 last_envelope_id: Some(format!("env-{}", suffix)),
2388 updated_at: Utc::now(),
2389 };
2390 storage.store_inbox_offset(&offset).await?;
2391 let loaded = storage
2392 .read_inbox_offset(&offset.entity_id, &offset.channel_id)
2393 .await?
2394 .expect("offset present");
2395 assert_eq!(loaded.last_envelope_id, offset.last_envelope_id);
2396
2397 let ticket = storage
2398 .create_pairing_token(&created.user_id, &device_id, 300)
2399 .await?;
2400 assert_eq!(ticket.pair_code.len(), 9);
2401 let paired_device = format!("paired-device-{}", suffix);
2402 let claim = storage
2403 .claim_pairing_token(&ticket.pair_code, &paired_device, &[7u8; 32])
2404 .await?;
2405 assert_eq!(claim.user.user_id, created.user_id);
2406 assert_eq!(claim.issuer_device_id, device_id);
2407 storage
2408 .client
2409 .execute(
2410 "INSERT INTO device_pairing (pair_code, user_id, issuer_device_id, issued_at, expires_at, attempts) VALUES ($1, $2, $3, now(), now() - interval '10 minutes', 0)",
2411 &[&format!("expired-{}", suffix), &created.user_id, &device_id],
2412 )
2413 .await
2414 .map_err(|_| StorageError::Postgres)?;
2415 let purged = storage.invalidate_expired_pairings().await?;
2416 assert!(purged >= 1);
2417 storage
2418 .write_user_blob(&created.user_id, "friends", "[]")
2419 .await?;
2420 let blob = storage.read_user_blob(&created.user_id, "friends").await?;
2421 assert_eq!(blob.as_deref(), Some("[]"));
2422
2423 let secret_name = format!("noise-static-{}", suffix);
2424 let now = Utc::now();
2425 let secret_record = ServerSecretRecord {
2426 name: secret_name.clone(),
2427 version: 1,
2428 secret: vec![9u8; 32],
2429 public: Some(vec![8u8; 32]),
2430 metadata: json!({"purpose": "test"}),
2431 created_at: now,
2432 valid_after: now,
2433 rotates_at: now + Duration::hours(1),
2434 expires_at: now + Duration::hours(2),
2435 };
2436 storage.insert_server_secret(&secret_record).await?;
2437 let latest = storage.latest_server_secret_version(&secret_name).await?;
2438 assert_eq!(latest, 1);
2439 let active = storage
2440 .active_server_secrets(&secret_name, now + Duration::minutes(30))
2441 .await?;
2442 assert_eq!(active.len(), 1);
2443 assert_eq!(active[0].public.as_ref().map(|v| v.len()), Some(32));
2444 assert_eq!(active[0].metadata["purpose"], json!("test"));
2445 let removed = storage
2446 .delete_expired_server_secrets(&secret_name, now + Duration::hours(3))
2447 .await?;
2448 assert_eq!(removed, 1);
2449
2450 let idempotency_key = IdempotencyKey {
2451 key: format!("key-{}", suffix),
2452 scope: "federation-test".to_string(),
2453 created_at: Utc::now(),
2454 };
2455 assert!(storage.store_idempotency(&idempotency_key).await?);
2456 assert!(!storage.store_idempotency(&idempotency_key).await?);
2457
2458 Ok(())
2459 }
2460}