1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use std::convert::TryInto;
4use std::error::Error;
5use std::fmt::{Display, Formatter};
6use std::str::FromStr;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9use tokio::task::JoinHandle;
10use tokio_postgres::{Client, NoTls};
11
12const INIT_SQL: &str = include_str!("../migrations/001_init.sql");
13
14#[derive(Debug)]
15pub enum StorageError {
16 Postgres,
17 Redis,
18 Serialization,
19 Missing,
20}
21
22impl Display for StorageError {
23 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
24 match self {
25 Self::Postgres => write!(f, "postgres failure"),
26 Self::Redis => write!(f, "redis failure"),
27 Self::Serialization => write!(f, "serialization failure"),
28 Self::Missing => write!(f, "missing record"),
29 }
30 }
31}
32
33impl Error for StorageError {}
34
35pub struct Storage {
36 client: Client,
37 _pg_task: JoinHandle<()>,
38 redis: Arc<Mutex<redis::aio::MultiplexedConnection>>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct NewUserProfile {
43 pub user_id: String,
44 pub handle: String,
45 pub display_name: Option<String>,
46 pub avatar_url: Option<String>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct UserProfile {
51 pub user_id: String,
52 pub handle: String,
53 pub display_name: Option<String>,
54 pub avatar_url: Option<String>,
55 pub created_at: DateTime<Utc>,
56 pub updated_at: DateTime<Utc>,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct DeviceRecord {
61 pub device_id: String,
62 pub user_id: String,
63 pub public_key: Vec<u8>,
64 pub status: String,
65 pub created_at: DateTime<Utc>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct SessionRecord {
70 pub session_id: String,
71 pub user_id: String,
72 pub device_id: String,
73 pub tls_fingerprint: String,
74 pub created_at: DateTime<Utc>,
75 pub ttl_seconds: i64,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct RelayEnvelope {
80 pub envelope_id: String,
81 pub channel_id: String,
82 pub payload: Vec<u8>,
83 pub deliver_after: DateTime<Utc>,
84 pub expires_at: DateTime<Utc>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct IdempotencyKey {
89 pub key: String,
90 pub scope: String,
91 pub created_at: DateTime<Utc>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct PresenceSnapshot {
96 pub entity: String,
97 pub state: String,
98 pub expires_at: DateTime<Utc>,
99 pub user_id: Option<String>,
100 pub handle: Option<String>,
101 pub display_name: Option<String>,
102 pub avatar_url: Option<String>,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct DeviceKeyEvent {
107 pub event_id: String,
108 pub device_id: String,
109 pub public_key: Vec<u8>,
110 pub recorded_at: DateTime<Utc>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ChatGroup {
115 pub group_id: String,
116 pub owner_device: String,
117 pub created_at: DateTime<Utc>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum GroupRole {
122 Owner,
123 Admin,
124 Member,
125}
126
127impl GroupRole {
128 fn as_str(&self) -> &'static str {
129 match self {
130 GroupRole::Owner => "owner",
131 GroupRole::Admin => "admin",
132 GroupRole::Member => "member",
133 }
134 }
135}
136
137impl FromStr for GroupRole {
138 type Err = StorageError;
139
140 fn from_str(value: &str) -> Result<Self, Self::Err> {
141 match value {
142 "owner" => Ok(GroupRole::Owner),
143 "admin" => Ok(GroupRole::Admin),
144 "member" => Ok(GroupRole::Member),
145 _ => Err(StorageError::Serialization),
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct GroupMember {
152 pub group_id: String,
153 pub device_id: String,
154 pub role: GroupRole,
155 pub joined_at: DateTime<Utc>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub enum FederationPeerStatus {
160 Active,
161 Pending,
162 Blocked,
163}
164
165impl FederationPeerStatus {
166 fn as_str(&self) -> &'static str {
167 match self {
168 FederationPeerStatus::Active => "active",
169 FederationPeerStatus::Pending => "pending",
170 FederationPeerStatus::Blocked => "blocked",
171 }
172 }
173}
174
175impl FromStr for FederationPeerStatus {
176 type Err = StorageError;
177
178 fn from_str(value: &str) -> Result<Self, Self::Err> {
179 match value {
180 "active" => Ok(FederationPeerStatus::Active),
181 "pending" => Ok(FederationPeerStatus::Pending),
182 "blocked" => Ok(FederationPeerStatus::Blocked),
183 _ => Err(StorageError::Serialization),
184 }
185 }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub struct FederationPeerRecord {
190 pub domain: String,
191 pub endpoint: String,
192 pub public_key: [u8; 32],
193 pub status: FederationPeerStatus,
194 pub updated_at: DateTime<Utc>,
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct InboxOffset {
199 pub entity_id: String,
200 pub channel_id: String,
201 pub last_envelope_id: Option<String>,
202 pub updated_at: DateTime<Utc>,
203}
204
205pub async fn connect(postgres_dsn: &str, redis_url: &str) -> Result<Storage, StorageError> {
207 let (client, connection) = tokio_postgres::connect(postgres_dsn, NoTls)
208 .await
209 .map_err(|_| StorageError::Postgres)?;
210 let task = tokio::spawn(async move {
211 if let Err(error) = connection.await {
212 tracing::error!("postgres connection stopped: {}", error);
213 }
214 });
215 let redis_client = redis::Client::open(redis_url).map_err(|_| StorageError::Redis)?;
216 let redis_connection = redis_client
217 .get_multiplexed_async_connection()
218 .await
219 .map_err(|_| StorageError::Redis)?;
220 Ok(Storage {
221 client,
222 _pg_task: task,
223 redis: Arc::new(Mutex::new(redis_connection)),
224 })
225}
226
227impl Storage {
228 pub async fn migrate(&self) -> Result<(), StorageError> {
230 self.client
231 .batch_execute(INIT_SQL)
232 .await
233 .map_err(|_| StorageError::Postgres)
234 }
235
236 pub async fn readiness(&self) -> Result<(), StorageError> {
238 self.client
239 .simple_query("SELECT 1")
240 .await
241 .map_err(|_| StorageError::Postgres)?;
242 let mut conn = self.redis.lock().await;
243 let _: String = redis::cmd("PING")
244 .query_async::<_, String>(&mut *conn)
245 .await
246 .map_err(|_| StorageError::Redis)?;
247 Ok(())
248 }
249
250 pub async fn upsert_device(&self, record: &DeviceRecord) -> Result<(), StorageError> {
252 let query = "INSERT INTO user_device (opaque_id, user_id, pubkey, status, created_at) VALUES ($1, $2, $3, $4, $5)
253 ON CONFLICT (opaque_id) DO UPDATE SET pubkey = excluded.pubkey, status = excluded.status
254 WHERE user_device.user_id = excluded.user_id";
255 self.client
256 .execute(
257 query,
258 &[
259 &record.device_id,
260 &record.user_id,
261 &record.public_key,
262 &record.status,
263 &record.created_at,
264 ],
265 )
266 .await
267 .map_err(|_| StorageError::Postgres)?;
268 Ok(())
269 }
270
271 pub async fn record_device_key_event(
273 &self,
274 event: &DeviceKeyEvent,
275 ) -> Result<(), StorageError> {
276 let query = "INSERT INTO device_key_event (event_id, device_id, public_key, recorded_at) VALUES ($1, $2, $3, $4)";
277 self.client
278 .execute(
279 query,
280 &[
281 &event.event_id,
282 &event.device_id,
283 &event.public_key,
284 &event.recorded_at,
285 ],
286 )
287 .await
288 .map_err(|_| StorageError::Postgres)?;
289 Ok(())
290 }
291
292 pub async fn latest_device_key_event(
294 &self,
295 device_id: &str,
296 ) -> Result<Option<DeviceKeyEvent>, StorageError> {
297 let query = "SELECT event_id, device_id, public_key, recorded_at
298 FROM device_key_event WHERE device_id = $1 ORDER BY recorded_at DESC LIMIT 1";
299 let row = self
300 .client
301 .query_opt(query, &[&device_id])
302 .await
303 .map_err(|_| StorageError::Postgres)?;
304 Ok(row.map(|row| DeviceKeyEvent {
305 event_id: row.get(0),
306 device_id: row.get(1),
307 public_key: row.get(2),
308 recorded_at: row.get(3),
309 }))
310 }
311
312 pub async fn record_session(&self, session: &SessionRecord) -> Result<(), StorageError> {
314 let query =
315 "INSERT INTO session (opaque_id, user_id, device_id, tls_fingerprint, created_at, ttl_seconds)
316 VALUES ($1, $2, $3, $4, $5, $6)";
317 self.client
318 .execute(
319 query,
320 &[
321 &session.session_id,
322 &session.user_id,
323 &session.device_id,
324 &session.tls_fingerprint,
325 &session.created_at,
326 &session.ttl_seconds,
327 ],
328 )
329 .await
330 .map_err(|_| StorageError::Postgres)?;
331 Ok(())
332 }
333
334 pub async fn load_device(&self, device_id: &str) -> Result<DeviceRecord, StorageError> {
336 let row = self
337 .client
338 .query_opt(
339 "SELECT opaque_id, user_id, pubkey, status, created_at FROM user_device WHERE opaque_id = $1",
340 &[&device_id],
341 )
342 .await
343 .map_err(|_| StorageError::Postgres)?;
344 let row = row.ok_or(StorageError::Missing)?;
345 Ok(DeviceRecord {
346 device_id: row.get(0),
347 user_id: row.get(1),
348 public_key: row.get(2),
349 status: row.get(3),
350 created_at: row.get(4),
351 })
352 }
353
354 pub async fn create_user(&self, profile: &NewUserProfile) -> Result<UserProfile, StorageError> {
356 let now = Utc::now();
357 let row = self
358 .client
359 .query_one(
360 "INSERT INTO app_user (user_id, handle, display_name, avatar_url, created_at, updated_at)
361 VALUES ($1, $2, $3, $4, $5, $5)
362 RETURNING user_id, handle, display_name, avatar_url, created_at, updated_at",
363 &[
364 &profile.user_id,
365 &profile.handle,
366 &profile.display_name,
367 &profile.avatar_url,
368 &now,
369 ],
370 )
371 .await
372 .map_err(|_| StorageError::Postgres)?;
373 Ok(UserProfile {
374 user_id: row.get(0),
375 handle: row.get(1),
376 display_name: row.get(2),
377 avatar_url: row.get(3),
378 created_at: row.get(4),
379 updated_at: row.get(5),
380 })
381 }
382
383 pub async fn load_user(&self, user_id: &str) -> Result<UserProfile, StorageError> {
385 let row = self
386 .client
387 .query_opt(
388 "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE user_id = $1",
389 &[&user_id],
390 )
391 .await
392 .map_err(|_| StorageError::Postgres)?;
393 let row = row.ok_or(StorageError::Missing)?;
394 Ok(UserProfile {
395 user_id: row.get(0),
396 handle: row.get(1),
397 display_name: row.get(2),
398 avatar_url: row.get(3),
399 created_at: row.get(4),
400 updated_at: row.get(5),
401 })
402 }
403
404 pub async fn load_user_by_handle(&self, handle: &str) -> Result<UserProfile, StorageError> {
406 let row = self
407 .client
408 .query_opt(
409 "SELECT user_id, handle, display_name, avatar_url, created_at, updated_at FROM app_user WHERE handle = $1",
410 &[&handle],
411 )
412 .await
413 .map_err(|_| StorageError::Postgres)?;
414 let row = row.ok_or(StorageError::Missing)?;
415 Ok(UserProfile {
416 user_id: row.get(0),
417 handle: row.get(1),
418 display_name: row.get(2),
419 avatar_url: row.get(3),
420 created_at: row.get(4),
421 updated_at: row.get(5),
422 })
423 }
424
425 pub async fn update_user_profile(
427 &self,
428 user_id: &str,
429 display_name: Option<&str>,
430 avatar_url: Option<&str>,
431 ) -> Result<(), StorageError> {
432 let now = Utc::now();
433 let affected = self
434 .client
435 .execute(
436 "UPDATE app_user SET display_name = COALESCE($2, display_name), avatar_url = COALESCE($3, avatar_url), updated_at = $4 WHERE user_id = $1",
437 &[&user_id, &display_name, &avatar_url, &now],
438 )
439 .await
440 .map_err(|_| StorageError::Postgres)?;
441 if affected == 0 {
442 return Err(StorageError::Missing);
443 }
444 Ok(())
445 }
446
447 pub async fn create_group(&self, group: &ChatGroup) -> Result<(), StorageError> {
449 self.client
450 .execute(
451 "INSERT INTO chat_group (group_id, owner_device, created_at) VALUES ($1, $2, $3)
452 ON CONFLICT (group_id) DO NOTHING",
453 &[&group.group_id, &group.owner_device, &group.created_at],
454 )
455 .await
456 .map_err(|_| StorageError::Postgres)?;
457 self.client
458 .execute(
459 "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
460 ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role",
461 &[
462 &group.group_id,
463 &group.owner_device,
464 &GroupRole::Owner.as_str(),
465 &group.created_at,
466 ],
467 )
468 .await
469 .map_err(|_| StorageError::Postgres)?;
470 Ok(())
471 }
472
473 pub async fn add_group_member(&self, member: &GroupMember) -> Result<(), StorageError> {
475 let query = "INSERT INTO group_member (group_id, device_id, role, joined_at) VALUES ($1, $2, $3, $4)
476 ON CONFLICT (group_id, device_id) DO UPDATE SET role = excluded.role, joined_at = excluded.joined_at";
477 self.client
478 .execute(
479 query,
480 &[
481 &member.group_id,
482 &member.device_id,
483 &member.role.as_str(),
484 &member.joined_at,
485 ],
486 )
487 .await
488 .map_err(|_| StorageError::Postgres)?;
489 Ok(())
490 }
491
492 pub async fn remove_group_member(
494 &self,
495 group_id: &str,
496 device_id: &str,
497 ) -> Result<(), StorageError> {
498 let affected = self
499 .client
500 .execute(
501 "DELETE FROM group_member WHERE group_id = $1 AND device_id = $2",
502 &[&group_id, &device_id],
503 )
504 .await
505 .map_err(|_| StorageError::Postgres)?;
506 if affected == 0 {
507 return Err(StorageError::Missing);
508 }
509 Ok(())
510 }
511
512 pub async fn list_group_members(
514 &self,
515 group_id: &str,
516 ) -> Result<Vec<GroupMember>, StorageError> {
517 let rows = self
518 .client
519 .query(
520 "SELECT group_id, device_id, role, joined_at FROM group_member WHERE group_id = $1 ORDER BY joined_at ASC",
521 &[&group_id],
522 )
523 .await
524 .map_err(|_| StorageError::Postgres)?;
525 let mut members = Vec::with_capacity(rows.len());
526 for row in rows {
527 let role: String = row.get(2);
528 let parsed = GroupRole::from_str(role.as_str())?;
529 members.push(GroupMember {
530 group_id: row.get(0),
531 device_id: row.get(1),
532 role: parsed,
533 joined_at: row.get(3),
534 });
535 }
536 Ok(members)
537 }
538
539 pub async fn load_group(&self, group_id: &str) -> Result<ChatGroup, StorageError> {
541 let row = self
542 .client
543 .query_opt(
544 "SELECT group_id, owner_device, created_at FROM chat_group WHERE group_id = $1",
545 &[&group_id],
546 )
547 .await
548 .map_err(|_| StorageError::Postgres)?;
549 let row = row.ok_or(StorageError::Missing)?;
550 Ok(ChatGroup {
551 group_id: row.get(0),
552 owner_device: row.get(1),
553 created_at: row.get(2),
554 })
555 }
556
557 pub async fn list_groups_for_device(
559 &self,
560 device_id: &str,
561 ) -> Result<Vec<ChatGroup>, StorageError> {
562 let rows = self
563 .client
564 .query(
565 "SELECT g.group_id, g.owner_device, g.created_at FROM chat_group g
566 INNER JOIN group_member m ON g.group_id = m.group_id
567 WHERE m.device_id = $1 ORDER BY g.created_at ASC",
568 &[&device_id],
569 )
570 .await
571 .map_err(|_| StorageError::Postgres)?;
572 Ok(rows
573 .into_iter()
574 .map(|row| ChatGroup {
575 group_id: row.get(0),
576 owner_device: row.get(1),
577 created_at: row.get(2),
578 })
579 .collect())
580 }
581
582 pub async fn upsert_federation_peer(
584 &self,
585 peer: &FederationPeerRecord,
586 ) -> Result<(), StorageError> {
587 let query = "INSERT INTO federation_peer (domain, endpoint, public_key, status, updated_at)
588 VALUES ($1, $2, $3, $4, $5)
589 ON CONFLICT (domain) DO UPDATE SET endpoint = excluded.endpoint, public_key = excluded.public_key, status = excluded.status, updated_at = excluded.updated_at";
590 self.client
591 .execute(
592 query,
593 &[
594 &peer.domain,
595 &peer.endpoint,
596 &peer.public_key.as_slice(),
597 &peer.status.as_str(),
598 &peer.updated_at,
599 ],
600 )
601 .await
602 .map_err(|_| StorageError::Postgres)?;
603 Ok(())
604 }
605
606 pub async fn load_federation_peer(
608 &self,
609 domain: &str,
610 ) -> Result<FederationPeerRecord, StorageError> {
611 let row = self
612 .client
613 .query_opt(
614 "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer WHERE domain = $1",
615 &[&domain],
616 )
617 .await
618 .map_err(|_| StorageError::Postgres)?;
619 let row = row.ok_or(StorageError::Missing)?;
620 let key: Vec<u8> = row.get(2);
621 let status: String = row.get(3);
622 let status = FederationPeerStatus::from_str(status.as_str())?;
623 let public_key: [u8; 32] = key
624 .as_slice()
625 .try_into()
626 .map_err(|_| StorageError::Serialization)?;
627 Ok(FederationPeerRecord {
628 domain: row.get(0),
629 endpoint: row.get(1),
630 public_key,
631 status,
632 updated_at: row.get(4),
633 })
634 }
635
636 pub async fn list_federation_peers(&self) -> Result<Vec<FederationPeerRecord>, StorageError> {
638 let rows = self
639 .client
640 .query(
641 "SELECT domain, endpoint, public_key, status, updated_at FROM federation_peer",
642 &[],
643 )
644 .await
645 .map_err(|_| StorageError::Postgres)?;
646 let mut peers = Vec::with_capacity(rows.len());
647 for row in rows {
648 let key: Vec<u8> = row.get(2);
649 let status: String = row.get(3);
650 let status = FederationPeerStatus::from_str(status.as_str())?;
651 let public_key: [u8; 32] = key
652 .as_slice()
653 .try_into()
654 .map_err(|_| StorageError::Serialization)?;
655 peers.push(FederationPeerRecord {
656 domain: row.get(0),
657 endpoint: row.get(1),
658 public_key,
659 status,
660 updated_at: row.get(4),
661 });
662 }
663 Ok(peers)
664 }
665
666 pub async fn set_federation_peer_status(
668 &self,
669 domain: &str,
670 status: FederationPeerStatus,
671 ) -> Result<(), StorageError> {
672 let now = Utc::now();
673 let affected = self
674 .client
675 .execute(
676 "UPDATE federation_peer SET status = $2, updated_at = $3 WHERE domain = $1",
677 &[&domain, &status.as_str(), &now],
678 )
679 .await
680 .map_err(|_| StorageError::Postgres)?;
681 if affected == 0 {
682 return Err(StorageError::Missing);
683 }
684 Ok(())
685 }
686
687 pub async fn enqueue_relay(&self, envelope: &RelayEnvelope) -> Result<(), StorageError> {
689 let query =
690 "INSERT INTO relay_queue (envelope_id, channel_id, payload, deliver_after, expires_at)
691 VALUES ($1, $2, $3, $4, $5)";
692 self.client
693 .execute(
694 query,
695 &[
696 &envelope.envelope_id,
697 &envelope.channel_id,
698 &envelope.payload,
699 &envelope.deliver_after,
700 &envelope.expires_at,
701 ],
702 )
703 .await
704 .map_err(|_| StorageError::Postgres)?;
705 Ok(())
706 }
707
708 pub async fn claim_envelopes(
710 &self,
711 channel_id: &str,
712 limit: i64,
713 ) -> Result<Vec<RelayEnvelope>, StorageError> {
714 let query = "DELETE FROM relay_queue
715 WHERE envelope_id IN (
716 SELECT envelope_id FROM relay_queue
717 WHERE channel_id = $1 AND deliver_after <= now()
718 ORDER BY deliver_after ASC
719 LIMIT $2
720 )
721 RETURNING envelope_id, channel_id, payload, deliver_after, expires_at";
722 let rows = self
723 .client
724 .query(query, &[&channel_id, &limit])
725 .await
726 .map_err(|_| StorageError::Postgres)?;
727 Ok(rows
728 .into_iter()
729 .map(|row| RelayEnvelope {
730 envelope_id: row.get(0),
731 channel_id: row.get(1),
732 payload: row.get(2),
733 deliver_after: row.get(3),
734 expires_at: row.get(4),
735 })
736 .collect())
737 }
738
739 pub async fn store_inbox_offset(&self, offset: &InboxOffset) -> Result<(), StorageError> {
741 let query = "INSERT INTO inbox_offset (entity_id, channel_id, last_envelope_id, updated_at)
742 VALUES ($1, $2, $3, $4)
743 ON CONFLICT (entity_id, channel_id) DO UPDATE SET last_envelope_id = excluded.last_envelope_id, updated_at = excluded.updated_at";
744 self.client
745 .execute(
746 query,
747 &[
748 &offset.entity_id,
749 &offset.channel_id,
750 &offset.last_envelope_id,
751 &offset.updated_at,
752 ],
753 )
754 .await
755 .map_err(|_| StorageError::Postgres)?;
756 Ok(())
757 }
758
759 pub async fn read_inbox_offset(
761 &self,
762 entity_id: &str,
763 channel_id: &str,
764 ) -> Result<Option<InboxOffset>, StorageError> {
765 let row = self
766 .client
767 .query_opt(
768 "SELECT entity_id, channel_id, last_envelope_id, updated_at FROM inbox_offset WHERE entity_id = $1 AND channel_id = $2",
769 &[&entity_id, &channel_id],
770 )
771 .await
772 .map_err(|_| StorageError::Postgres)?;
773 Ok(row.map(|row| InboxOffset {
774 entity_id: row.get(0),
775 channel_id: row.get(1),
776 last_envelope_id: row.get(2),
777 updated_at: row.get(3),
778 }))
779 }
780
781 pub async fn store_idempotency(&self, key: &IdempotencyKey) -> Result<(), StorageError> {
783 let query = "INSERT INTO idempotency (key, scope, created_at) VALUES ($1, $2, $3)
784 ON CONFLICT (key, scope) DO NOTHING";
785 self.client
786 .execute(query, &[&key.key, &key.scope, &key.created_at])
787 .await
788 .map_err(|_| StorageError::Postgres)?;
789 Ok(())
790 }
791
792 pub async fn publish_presence(&self, snapshot: &PresenceSnapshot) -> Result<(), StorageError> {
794 let mut conn = self.redis.lock().await;
795 let ttl = (snapshot.expires_at.timestamp() - Utc::now().timestamp()).max(1) as usize;
796 let payload = serde_json::json!({
797 "entity": snapshot.entity,
798 "state": snapshot.state,
799 "expires_at": snapshot.expires_at.to_rfc3339(),
800 "user": snapshot.user_id.as_ref().map(|id| serde_json::json!({
801 "id": id,
802 "handle": snapshot.handle.clone(),
803 "display_name": snapshot.display_name.clone(),
804 "avatar_url": snapshot.avatar_url.clone(),
805 })),
806 })
807 .to_string();
808 redis::cmd("SETEX")
809 .arg(format!("presence:{}", snapshot.entity))
810 .arg(ttl)
811 .arg(payload)
812 .query_async::<_, ()>(&mut *conn)
813 .await
814 .map_err(|_| StorageError::Redis)?;
815 Ok(())
816 }
817
818 pub async fn read_presence(
820 &self,
821 entity: &str,
822 ) -> Result<Option<PresenceSnapshot>, StorageError> {
823 let mut conn = self.redis.lock().await;
824 let value: Option<String> = redis::cmd("GET")
825 .arg(format!("presence:{}", entity))
826 .query_async::<_, Option<String>>(&mut *conn)
827 .await
828 .map_err(|_| StorageError::Redis)?;
829 if let Some(json) = value {
830 let parsed: Value =
831 serde_json::from_str(&json).map_err(|_| StorageError::Serialization)?;
832 let state = parsed
833 .get("state")
834 .and_then(|v| v.as_str())
835 .unwrap_or("online")
836 .to_string();
837 let expires = parsed
838 .get("expires_at")
839 .and_then(|v| v.as_str())
840 .ok_or(StorageError::Serialization)?;
841 let expires = DateTime::parse_from_rfc3339(expires)
842 .map_err(|_| StorageError::Serialization)?
843 .with_timezone(&Utc);
844 let user_obj = parsed.get("user").and_then(|v| v.as_object());
845 let user_id = user_obj
846 .and_then(|map| map.get("id"))
847 .and_then(|v| v.as_str())
848 .map(|v| v.to_string());
849 let handle = user_obj
850 .and_then(|map| map.get("handle"))
851 .and_then(|v| v.as_str())
852 .map(|v| v.to_string());
853 let display_name = user_obj
854 .and_then(|map| map.get("display_name"))
855 .and_then(|v| v.as_str())
856 .map(|v| v.to_string());
857 let avatar_url = user_obj
858 .and_then(|map| map.get("avatar_url"))
859 .and_then(|v| v.as_str())
860 .map(|v| v.to_string());
861 Ok(Some(PresenceSnapshot {
862 entity: entity.to_string(),
863 state,
864 expires_at: expires,
865 user_id,
866 handle,
867 display_name,
868 avatar_url,
869 }))
870 } else {
871 Ok(None)
872 }
873 }
874
875 pub async fn register_route(
877 &self,
878 entity: &str,
879 session_id: &str,
880 ttl_seconds: i64,
881 ) -> Result<(), StorageError> {
882 let mut conn = self.redis.lock().await;
883 redis::cmd("SETEX")
884 .arg(format!("route:{}", entity))
885 .arg(ttl_seconds.max(1) as usize)
886 .arg(session_id)
887 .query_async::<_, ()>(&mut *conn)
888 .await
889 .map_err(|_| StorageError::Redis)?;
890 Ok(())
891 }
892
893 pub async fn clear_route(&self, entity: &str) -> Result<(), StorageError> {
895 let mut conn = self.redis.lock().await;
896 let _: () = redis::cmd("DEL")
897 .arg(format!("route:{}", entity))
898 .query_async::<_, ()>(&mut *conn)
899 .await
900 .map_err(|_| StorageError::Redis)?;
901 Ok(())
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908 use chrono::Utc;
909 use std::str::FromStr;
910
911 #[test]
912 fn init_sql_exists() {
913 assert!(INIT_SQL.contains("CREATE TABLE"));
914 }
915
916 #[test]
917 fn init_sql_declares_new_relations() {
918 assert!(INIT_SQL.contains("device_key_event"));
919 assert!(INIT_SQL.contains("chat_group"));
920 assert!(INIT_SQL.contains("group_member"));
921 assert!(INIT_SQL.contains("federation_peer"));
922 assert!(INIT_SQL.contains("inbox_offset"));
923 }
924
925 #[test]
926 fn group_role_roundtrip() {
927 assert_eq!(GroupRole::Owner.as_str(), "owner");
928 assert_eq!(GroupRole::from_str("admin").unwrap(), GroupRole::Admin);
929 assert!(GroupRole::from_str("unknown").is_err());
930 }
931
932 #[test]
933 fn federation_status_roundtrip() {
934 assert_eq!(FederationPeerStatus::Active.as_str(), "active");
935 assert_eq!(
936 FederationPeerStatus::from_str("pending").unwrap(),
937 FederationPeerStatus::Pending
938 );
939 assert!(FederationPeerStatus::from_str("offline").is_err());
940 }
941
942 #[tokio::test]
943 async fn storage_integration_flow() -> Result<(), Box<dyn std::error::Error>> {
944 let pg = match std::env::var("COMMUCAT_TEST_PG_DSN") {
945 Ok(value) => value,
946 Err(_) => {
947 eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_PG_DSN not set");
948 return Ok(());
949 }
950 };
951 let redis = match std::env::var("COMMUCAT_TEST_REDIS_URL") {
952 Ok(value) => value,
953 Err(_) => {
954 eprintln!("skipping storage_integration_flow: COMMUCAT_TEST_REDIS_URL not set");
955 return Ok(());
956 }
957 };
958 let storage = connect(&pg, &redis).await?;
959 storage.migrate().await?;
960 let suffix = Utc::now().timestamp_nanos_opt().unwrap_or_default();
961 let user_profile = NewUserProfile {
962 user_id: format!("test-user-{}", suffix),
963 handle: format!("tester{}", suffix),
964 display_name: Some("Tester".to_string()),
965 avatar_url: None,
966 };
967 let created = storage.create_user(&user_profile).await?;
968 let device_id = format!("test-device-{}", suffix);
969 let device_record = DeviceRecord {
970 device_id: device_id.clone(),
971 user_id: created.user_id.clone(),
972 public_key: vec![1; 32],
973 status: "active".to_string(),
974 created_at: Utc::now(),
975 };
976 storage.upsert_device(&device_record).await?;
977 let key_event = DeviceKeyEvent {
978 event_id: format!("evt-{}", suffix),
979 device_id: device_id.clone(),
980 public_key: device_record.public_key.clone(),
981 recorded_at: Utc::now(),
982 };
983 storage.record_device_key_event(&key_event).await?;
984 let latest = storage
985 .latest_device_key_event(&device_id)
986 .await?
987 .expect("expected key event");
988 assert_eq!(latest.public_key.len(), 32);
989
990 let group = ChatGroup {
991 group_id: format!("group-{}", suffix),
992 owner_device: device_id.clone(),
993 created_at: Utc::now(),
994 };
995 storage.create_group(&group).await?;
996 let member = GroupMember {
997 group_id: group.group_id.clone(),
998 device_id: format!("peer-device-{}", suffix),
999 role: GroupRole::Member,
1000 joined_at: Utc::now(),
1001 };
1002 storage.add_group_member(&member).await?;
1003 let members = storage.list_group_members(&group.group_id).await?;
1004 assert!(members.iter().any(|m| m.device_id == device_id));
1005 assert!(members.iter().any(|m| m.device_id == member.device_id));
1006 let memberships = storage.list_groups_for_device(&member.device_id).await?;
1007 assert_eq!(memberships.len(), 1);
1008 storage
1009 .remove_group_member(&group.group_id, &member.device_id)
1010 .await?;
1011
1012 let peer = FederationPeerRecord {
1013 domain: format!("peer{}.example", suffix),
1014 endpoint: "https://peer.example/federation".to_string(),
1015 public_key: [5u8; 32],
1016 status: FederationPeerStatus::Active,
1017 updated_at: Utc::now(),
1018 };
1019 storage.upsert_federation_peer(&peer).await?;
1020 let fetched = storage.load_federation_peer(&peer.domain).await?;
1021 assert_eq!(fetched.endpoint, peer.endpoint);
1022
1023 let offset = InboxOffset {
1024 entity_id: device_id.clone(),
1025 channel_id: format!("inbox:{}", device_id),
1026 last_envelope_id: Some(format!("env-{}", suffix)),
1027 updated_at: Utc::now(),
1028 };
1029 storage.store_inbox_offset(&offset).await?;
1030 let loaded = storage
1031 .read_inbox_offset(&offset.entity_id, &offset.channel_id)
1032 .await?
1033 .expect("offset present");
1034 assert_eq!(loaded.last_envelope_id, offset.last_envelope_id);
1035 Ok(())
1036 }
1037}