1use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17 Surreal,
18 engine::local::{Db, Mem, SurrealKv},
19 types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36DEFINE INDEX IF NOT EXISTS membership_channel_user ON membership FIELDS channel, user UNIQUE;
37DEFINE INDEX IF NOT EXISTS ban_channel_user ON ban FIELDS channel, user UNIQUE;
38DEFINE TABLE IF NOT EXISTS meta;
39DEFINE TABLE IF NOT EXISTS message;
40DEFINE INDEX IF NOT EXISTS message_channel_ts ON message FIELDS channel, ts_ms;
41";
42
43#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
46pub struct MessageRecord {
47 pub channel: String,
49 pub from: String,
51 pub payload: Vec<u8>,
53 pub ts_ms: i64,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
59pub struct MetaRecord {
60 pub instance_id: String,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
66pub struct UserRecord {
67 pub username: String,
69 pub created_at: String,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
76pub struct MachineRecord {
77 pub user: String,
79 pub name: String,
81 pub pubkey: String,
83 pub added_at: String,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
91pub struct ChannelRecord {
92 pub name: String,
94 pub visibility: String,
96 pub created_by: String,
98 pub created_at: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
104pub struct InviteRecord {
105 pub channel: String,
107 pub token: String,
109 pub uses_remaining: Option<i64>,
111 pub expires_at: Option<String>,
113 pub created_by: String,
115}
116
117#[derive(SurrealValue)]
120struct ByUsername {
121 username: String,
122}
123
124#[derive(SurrealValue)]
125struct ByPubkey {
126 pubkey: String,
127}
128
129#[derive(SurrealValue)]
130struct ByUser {
131 user: String,
132}
133
134#[derive(SurrealValue)]
135struct ByName {
136 name: String,
137}
138
139#[derive(SurrealValue)]
140struct ByToken {
141 tok: String,
143}
144
145#[derive(SurrealValue)]
146struct ByUserAndName {
147 user: String,
148 name: String,
149}
150
151#[derive(SurrealValue)]
152struct SetVisibility {
153 name: String,
154 visibility: String,
155}
156
157#[derive(SurrealValue)]
158struct Rename {
159 old: String,
160 new: String,
161}
162
163#[derive(SurrealValue)]
164struct ReadSinceBind {
165 channel: String,
166 since_ms: i64,
167 cap: i64,
168}
169
170#[derive(SurrealValue)]
171struct PurgeBind {
172 cutoff_ms: i64,
173}
174
175#[derive(SurrealValue)]
176struct SetUses {
177 tok: String,
179 uses: i64,
180}
181
182#[derive(SurrealValue)]
183struct Membership {
184 channel: String,
185 user: String,
186}
187
188#[derive(SurrealValue)]
189struct ByChannel {
190 channel: String,
191}
192
193const MAX_WRITE_ATTEMPTS: usize = 64;
196
197fn is_write_conflict(err: &surrealdb::Error) -> bool {
203 err.to_string().to_lowercase().contains("conflict")
204}
205
206#[derive(Clone)]
209pub struct Store {
210 db: Surreal<Db>,
211}
212
213impl Store {
214 pub async fn open(path: &Path) -> Res<Self> {
220 let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
221 Self::init(db).await
222 }
223
224 pub async fn open_in_memory() -> Res<Self> {
230 let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
231 Self::init(db).await
232 }
233
234 async fn init(db: Surreal<Db>) -> Res<Self> {
235 db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
236 db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
237 Ok(Self { db })
238 }
239
240 async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
241 let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
242 Ok(())
243 }
244
245 pub async fn instance_id(&self) -> Res<String> {
254 let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to query server meta")?;
255 let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
256 if let Some(meta) = rows.into_iter().next() {
257 return Ok(meta.instance_id);
258 }
259
260 let id = crate::identity::generate_token()?;
261 let created: Result<Option<MetaRecord>, _> = self.db.create(("meta", "server")).content(MetaRecord { instance_id: id.clone() }).await;
264 if created.is_ok() {
265 return Ok(id);
266 }
267 let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to re-query server meta")?;
268 let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
269 rows.into_iter().next().map(|meta| meta.instance_id).context("server meta write raced but no record exists")
270 }
271
272 pub async fn append_message(&self, channel: &str, from: &str, payload: &[u8], ts_ms: i64) -> Void {
278 self.insert(
279 "message",
280 MessageRecord {
281 channel: channel.to_owned(),
282 from: from.to_owned(),
283 payload: payload.to_vec(),
284 ts_ms,
285 },
286 )
287 .await
288 }
289
290 pub async fn read_messages_since(&self, channel: &str, since_ms: i64, cap: usize) -> Res<Vec<MessageRecord>> {
297 let mut response = self
298 .db
299 .query("SELECT * OMIT id FROM message WHERE channel = $channel AND ts_ms > $since_ms ORDER BY ts_ms ASC LIMIT $cap")
300 .bind(ReadSinceBind {
301 channel: channel.to_owned(),
302 since_ms,
303 cap: i64::try_from(cap).unwrap_or(i64::MAX),
304 })
305 .await
306 .context("failed to query message history")?;
307 let rows: Vec<MessageRecord> = response.take(0).context("failed to decode message history")?;
308 Ok(rows)
309 }
310
311 pub async fn purge_messages_before(&self, cutoff_ms: i64) -> Void {
317 self.db
318 .query("DELETE FROM message WHERE ts_ms < $cutoff_ms")
319 .bind(PurgeBind { cutoff_ms })
320 .await
321 .context("failed to purge message history")?
322 .check()
323 .context("message purge reported an error")?;
324 Ok(())
325 }
326
327 pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
333 let record = UserRecord {
334 username: username.to_owned(),
335 created_at: now_rfc3339(),
336 };
337 self.insert("user", record.clone()).await?;
338 Ok(record)
339 }
340
341 pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
347 let mut response = self
348 .db
349 .query("SELECT * OMIT id FROM user WHERE username = $username")
350 .bind(ByUsername { username: username.to_owned() })
351 .await
352 .context("failed to query user")?;
353 let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
354 Ok(rows.into_iter().next())
355 }
356
357 pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
364 let record = MachineRecord {
365 user: user.to_owned(),
366 name: name.to_owned(),
367 pubkey: pubkey_base64.to_owned(),
368 added_at: now_rfc3339(),
369 };
370 self.insert("machine", record.clone()).await?;
371 Ok(record)
372 }
373
374 pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
380 let mut response = self
381 .db
382 .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
383 .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
384 .await
385 .context("failed to query machine")?;
386 let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
387 Ok(rows.into_iter().next())
388 }
389
390 pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
396 let mut response = self
397 .db
398 .query("SELECT * OMIT id FROM machine WHERE user = $user")
399 .bind(ByUser { user: user.to_owned() })
400 .await
401 .context("failed to list machines")?;
402 response.take(0).context("failed to decode machine rows")
403 }
404
405 pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
411 self.db
412 .query("DELETE machine WHERE user = $user AND name = $name")
413 .bind(ByUserAndName {
414 user: user.to_owned(),
415 name: name.to_owned(),
416 })
417 .await
418 .context("failed to delete machine")?
419 .check()
420 .context("machine delete reported an error")?;
421 Ok(())
422 }
423
424 pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
430 let record = ChannelRecord {
431 name: name.to_owned(),
432 visibility: visibility.as_str().to_owned(),
433 created_by: created_by.to_owned(),
434 created_at: now_rfc3339(),
435 };
436 self.insert("channel", record.clone()).await?;
437 self.add_channel_member(name, created_by).await?;
439 Ok(record)
440 }
441
442 pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
448 let mut response = self
449 .db
450 .query("SELECT * OMIT id FROM channel WHERE name = $name")
451 .bind(ByName { name: name.to_owned() })
452 .await
453 .context("failed to query channel")?;
454 let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
455 Ok(rows.into_iter().next())
456 }
457
458 pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
464 let record = InviteRecord {
465 channel: channel.to_owned(),
466 token: token.to_owned(),
467 uses_remaining,
468 expires_at,
469 created_by: created_by.to_owned(),
470 };
471 self.insert("invite", record.clone()).await?;
472 Ok(record)
473 }
474
475 pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
481 let mut response = self
482 .db
483 .query("SELECT * OMIT id FROM invite WHERE token = $tok")
484 .bind(ByToken { tok: token.to_owned() })
485 .await
486 .context("failed to query invite")?;
487 let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
488 Ok(rows.into_iter().next())
489 }
490
491 pub async fn list_invites(&self, channel: &str) -> Res<Vec<InviteRecord>> {
497 let mut response = self
498 .db
499 .query("SELECT * OMIT id FROM invite WHERE channel = $channel")
500 .bind(ByChannel { channel: channel.to_owned() })
501 .await
502 .context("failed to list invites")?;
503 response.take(0).context("failed to decode invite rows")
504 }
505
506 pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
512 let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
513 response.take(0).context("failed to decode channel rows")
514 }
515
516 pub async fn add_channel_member(&self, channel: &str, user: &str) -> Void {
525 for attempt in 0..MAX_WRITE_ATTEMPTS {
526 let outcome = self
527 .db
528 .query("INSERT INTO membership { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
529 .bind(Membership {
530 channel: channel.to_owned(),
531 user: user.to_owned(),
532 })
533 .await
534 .and_then(surrealdb::IndexedResults::check);
535 match outcome {
536 Ok(_) => return Ok(()),
537 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
538 Err(e) => return Err(anyhow::Error::new(e).context("failed to add channel member")),
539 }
540 }
541 anyhow::bail!("adding a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
542 }
543
544 pub async fn remove_channel_member(&self, channel: &str, user: &str) -> Void {
550 for attempt in 0..MAX_WRITE_ATTEMPTS {
551 let outcome = self
552 .db
553 .query("DELETE membership WHERE channel = $channel AND user = $user")
554 .bind(Membership {
555 channel: channel.to_owned(),
556 user: user.to_owned(),
557 })
558 .await
559 .and_then(surrealdb::IndexedResults::check);
560 match outcome {
561 Ok(_) => return Ok(()),
562 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
563 Err(e) => return Err(anyhow::Error::new(e).context("failed to remove channel member")),
564 }
565 }
566 anyhow::bail!("removing a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
567 }
568
569 pub async fn is_channel_member(&self, channel: &str, user: &str) -> Res<bool> {
575 let mut response = self
576 .db
577 .query("SELECT VALUE user FROM membership WHERE channel = $channel AND user = $user")
578 .bind(Membership {
579 channel: channel.to_owned(),
580 user: user.to_owned(),
581 })
582 .await
583 .context("failed to query membership")?;
584 let rows: Vec<String> = response.take(0).context("failed to decode membership rows")?;
585 Ok(!rows.is_empty())
586 }
587
588 pub async fn list_user_memberships(&self, user: &str) -> Res<Vec<String>> {
594 let mut response = self
595 .db
596 .query("SELECT VALUE channel FROM membership WHERE user = $user")
597 .bind(ByUser { user: user.to_owned() })
598 .await
599 .context("failed to list user memberships")?;
600 response.take(0).context("failed to decode membership channels")
601 }
602
603 pub async fn list_channel_members(&self, channel: &str) -> Res<Vec<String>> {
609 let mut response = self
610 .db
611 .query("SELECT VALUE user FROM membership WHERE channel = $channel")
612 .bind(ByChannel { channel: channel.to_owned() })
613 .await
614 .context("failed to list channel members")?;
615 response.take(0).context("failed to decode membership users")
616 }
617
618 pub async fn add_ban(&self, channel: &str, user: &str) -> Void {
625 for attempt in 0..MAX_WRITE_ATTEMPTS {
626 let outcome = self
627 .db
628 .query("INSERT INTO ban { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
629 .bind(Membership {
630 channel: channel.to_owned(),
631 user: user.to_owned(),
632 })
633 .await
634 .and_then(surrealdb::IndexedResults::check);
635 match outcome {
636 Ok(_) => return Ok(()),
637 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
638 Err(e) => return Err(anyhow::Error::new(e).context("failed to add ban")),
639 }
640 }
641 anyhow::bail!("adding a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
642 }
643
644 pub async fn remove_ban(&self, channel: &str, user: &str) -> Void {
650 for attempt in 0..MAX_WRITE_ATTEMPTS {
651 let outcome = self
652 .db
653 .query("DELETE ban WHERE channel = $channel AND user = $user")
654 .bind(Membership {
655 channel: channel.to_owned(),
656 user: user.to_owned(),
657 })
658 .await
659 .and_then(surrealdb::IndexedResults::check);
660 match outcome {
661 Ok(_) => return Ok(()),
662 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
663 Err(e) => return Err(anyhow::Error::new(e).context("failed to remove ban")),
664 }
665 }
666 anyhow::bail!("removing a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
667 }
668
669 pub async fn list_bans(&self) -> Res<Vec<(String, String)>> {
675 let mut response = self.db.query("SELECT channel, user FROM ban").await.context("failed to list bans")?;
676 let rows: Vec<Membership> = response.take(0).context("failed to decode ban rows")?;
677 Ok(rows.into_iter().map(|row| (row.channel, row.user)).collect())
678 }
679
680 pub async fn list_channel_bans(&self, channel: &str) -> Res<Vec<String>> {
686 let mut response = self
687 .db
688 .query("SELECT VALUE user FROM ban WHERE channel = $channel")
689 .bind(ByChannel { channel: channel.to_owned() })
690 .await
691 .context("failed to list channel bans")?;
692 response.take(0).context("failed to decode ban users")
693 }
694
695 pub async fn list_channels_created_by(&self, user: &str) -> Res<Vec<String>> {
701 let mut response = self
702 .db
703 .query("SELECT VALUE name FROM channel WHERE created_by = $user")
704 .bind(ByUser { user: user.to_owned() })
705 .await
706 .context("failed to list created channels")?;
707 response.take(0).context("failed to decode channel names")
708 }
709
710 pub async fn delete_user_memberships(&self, user: &str) -> Void {
716 self.db
717 .query("DELETE membership WHERE user = $user")
718 .bind(ByUser { user: user.to_owned() })
719 .await
720 .context("failed to delete user memberships")?
721 .check()
722 .context("user membership delete reported an error")?;
723 Ok(())
724 }
725
726 pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
732 self.db
733 .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
734 .bind(SetVisibility {
735 name: name.to_owned(),
736 visibility: visibility.as_str().to_owned(),
737 })
738 .await
739 .context("failed to update channel visibility")?
740 .check()
741 .context("channel visibility update reported an error")?;
742 Ok(())
743 }
744
745 pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
751 self.db
752 .query("UPDATE channel SET name = $new WHERE name = $old")
753 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
754 .await
755 .context("failed to rename channel")?
756 .check()
757 .context("channel rename reported an error")?;
758 self.db
760 .query("UPDATE membership SET channel = $new WHERE channel = $old")
761 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
762 .await
763 .context("failed to migrate channel memberships")?
764 .check()
765 .context("membership rename reported an error")?;
766 self.db
767 .query("UPDATE invite SET channel = $new WHERE channel = $old")
768 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
769 .await
770 .context("failed to migrate channel invites")?
771 .check()
772 .context("invite rename reported an error")?;
773 self.db
774 .query("UPDATE ban SET channel = $new WHERE channel = $old")
775 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
776 .await
777 .context("failed to migrate channel bans")?
778 .check()
779 .context("ban rename reported an error")?;
780 self.db
781 .query("UPDATE message SET channel = $new WHERE channel = $old")
782 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
783 .await
784 .context("failed to migrate channel history")?
785 .check()
786 .context("message rename reported an error")?;
787 Ok(())
788 }
789
790 pub async fn delete_channel(&self, name: &str) -> Void {
796 self.db
797 .query("DELETE channel WHERE name = $name")
798 .bind(ByName { name: name.to_owned() })
799 .await
800 .context("failed to delete channel")?
801 .check()
802 .context("channel delete reported an error")?;
803 self.db
806 .query("DELETE membership WHERE channel = $channel")
807 .bind(ByChannel { channel: name.to_owned() })
808 .await
809 .context("failed to delete channel memberships")?
810 .check()
811 .context("membership delete reported an error")?;
812 self.db
813 .query("DELETE invite WHERE channel = $channel")
814 .bind(ByChannel { channel: name.to_owned() })
815 .await
816 .context("failed to delete channel invites")?
817 .check()
818 .context("invite delete reported an error")?;
819 self.db
820 .query("DELETE ban WHERE channel = $channel")
821 .bind(ByChannel { channel: name.to_owned() })
822 .await
823 .context("failed to delete channel bans")?
824 .check()
825 .context("ban delete reported an error")?;
826 self.db
829 .query("DELETE message WHERE channel = $channel")
830 .bind(ByChannel { channel: name.to_owned() })
831 .await
832 .context("failed to delete channel history")?
833 .check()
834 .context("message delete reported an error")?;
835 Ok(())
836 }
837
838 pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
844 self.db
845 .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
846 .bind(SetUses {
847 tok: token.to_owned(),
848 uses: uses_remaining,
849 })
850 .await
851 .context("failed to update invite uses")?
852 .check()
853 .context("invite uses update reported an error")?;
854 Ok(())
855 }
856
857 pub async fn try_consume_invite_use(&self, token: &str) -> Res<bool> {
867 for attempt in 0..MAX_WRITE_ATTEMPTS {
868 let outcome = self
869 .db
870 .query("UPDATE invite SET uses_remaining = uses_remaining - 1 WHERE token = $tok AND uses_remaining > 0 RETURN VALUE uses_remaining")
871 .bind(ByToken { tok: token.to_owned() })
872 .await
873 .and_then(surrealdb::IndexedResults::check);
874 match outcome {
875 Ok(mut response) => {
876 let remaining: Vec<i64> = response.take(0).context("failed to decode invite uses")?;
877 return match remaining.into_iter().next() {
878 None => Ok(false),
880 Some(0) => {
882 self.delete_invite(token).await?;
883 Ok(true)
884 }
885 Some(_) => Ok(true),
887 };
888 }
889 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
890 Err(e) => return Err(anyhow::Error::new(e).context("failed to consume invite use")),
891 }
892 }
893 anyhow::bail!("consuming an invite use exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
894 }
895
896 pub async fn delete_invite(&self, token: &str) -> Void {
902 self.db
903 .query("DELETE invite WHERE token = $tok")
904 .bind(ByToken { tok: token.to_owned() })
905 .await
906 .context("failed to delete invite")?
907 .check()
908 .context("invite delete reported an error")?;
909 Ok(())
910 }
911
912 pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
918 let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
919 response.take(0).context("failed to decode user rows")
920 }
921
922 pub async fn delete_user(&self, username: &str) -> Void {
928 self.db
929 .query("DELETE user WHERE username = $username")
930 .bind(ByUsername { username: username.to_owned() })
931 .await
932 .context("failed to delete user")?
933 .check()
934 .context("user delete reported an error")?;
935 Ok(())
936 }
937}
938
939fn now_rfc3339() -> String {
940 chrono::Utc::now().to_rfc3339()
941}
942
943#[cfg(test)]
944mod tests {
945 #![allow(clippy::unwrap_used)]
947
948 use super::*;
949 use pretty_assertions::assert_eq;
950
951 async fn store() -> Store {
952 Store::open_in_memory().await.unwrap()
953 }
954
955 #[tokio::test]
956 async fn store_instance_id_is_stable_across_reopen() {
957 let dir = tempfile::TempDir::new().unwrap();
958
959 let first = {
960 let store = Store::open(dir.path()).await.unwrap();
961 let id = store.instance_id().await.unwrap();
962 assert!(!id.is_empty());
963 assert_eq!(store.instance_id().await.unwrap(), id);
965 id
966 };
967
968 let reopened = 'reopen: {
972 for _ in 0..50 {
973 if let Ok(store) = Store::open(dir.path()).await {
974 break 'reopen store;
975 }
976 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
977 }
978 panic!("the store never became reopenable after drop");
979 };
980 assert_eq!(reopened.instance_id().await.unwrap(), first);
981 }
982
983 #[tokio::test]
984 async fn store_messages_append_and_read_since_in_order() {
985 let store = store().await;
986 store.append_message("ops", "aaron/ws/s1", b"one", 100).await.unwrap();
987 store.append_message("ops", "aaron/ws/s1", b"two", 200).await.unwrap();
988 store.append_message("other", "aaron/ws/s1", b"elsewhere", 150).await.unwrap();
989
990 let msgs = store.read_messages_since("ops", 50, 10).await.unwrap();
992 assert_eq!(msgs.iter().map(|m| m.payload.as_slice()).collect::<Vec<_>>(), vec![b"one".as_slice(), b"two".as_slice()]);
993 assert_eq!((msgs[0].ts_ms, msgs[0].from.as_str()), (100, "aaron/ws/s1"));
994
995 let after = store.read_messages_since("ops", 100, 10).await.unwrap();
996 assert_eq!(after.len(), 1, "since is exclusive: the watermark message itself is not re-delivered");
997 assert_eq!(after[0].payload, b"two");
998
999 let capped = store.read_messages_since("ops", 0, 1).await.unwrap();
1001 assert_eq!(capped.len(), 1);
1002 assert_eq!(capped[0].payload, b"one");
1003 }
1004
1005 #[tokio::test]
1006 async fn store_messages_purge_before_removes_old_rows() {
1007 let store = store().await;
1008 store.append_message("ops", "aaron/ws/s1", b"old", 100).await.unwrap();
1009 store.append_message("ops", "aaron/ws/s1", b"new", 200).await.unwrap();
1010
1011 store.purge_messages_before(150).await.unwrap();
1012
1013 let msgs = store.read_messages_since("ops", 0, 10).await.unwrap();
1014 assert_eq!(msgs.len(), 1, "rows older than the retention cutoff must be gone");
1015 assert_eq!(msgs[0].payload, b"new");
1016 }
1017
1018 #[tokio::test]
1019 async fn store_messages_cascade_on_channel_delete_and_rename() {
1020 let store = store().await;
1021 store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1022 store.append_message("ops", "aaron/ws/s1", b"hello", 100).await.unwrap();
1023
1024 store.rename_channel("ops", "ops2").await.unwrap();
1026 assert_eq!(store.read_messages_since("ops2", 0, 10).await.unwrap().len(), 1);
1027 assert!(store.read_messages_since("ops", 0, 10).await.unwrap().is_empty());
1028
1029 store.delete_channel("ops2").await.unwrap();
1031 assert!(store.read_messages_since("ops2", 0, 10).await.unwrap().is_empty());
1032 }
1033
1034 #[tokio::test]
1035 async fn user_create_and_fetch_round_trip() {
1036 let store = store().await;
1037 let created = store.create_user("aaron").await.unwrap();
1038
1039 assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
1040 assert_eq!(store.get_user("nobody").await.unwrap(), None);
1041 }
1042
1043 #[tokio::test]
1044 async fn duplicate_username_is_rejected() {
1045 let store = store().await;
1046 store.create_user("aaron").await.unwrap();
1047
1048 assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
1049 }
1050
1051 #[tokio::test]
1052 async fn machine_pubkey_is_globally_unique() {
1053 let store = store().await;
1054 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
1055
1056 assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
1058 }
1059
1060 #[tokio::test]
1061 async fn machine_name_is_unique_within_a_user_but_not_across_users() {
1062 let store = store().await;
1063 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
1064
1065 assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
1067 store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
1069 }
1070
1071 #[tokio::test]
1072 async fn machines_list_and_delete_for_a_user() {
1073 let store = store().await;
1074 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
1075 store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
1076
1077 assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
1078
1079 store.delete_machine("aaron", "sno-box").await.unwrap();
1080 let remaining = store.list_machines("aaron").await.unwrap();
1081 assert_eq!(remaining.len(), 1);
1082 assert_eq!(remaining[0].name, "workstation");
1083 }
1084
1085 #[tokio::test]
1086 async fn channel_create_fetch_and_unique_name() {
1087 let store = store().await;
1088 let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1089
1090 assert_eq!(created.visibility, "private");
1091 assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
1092 assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
1093 }
1094
1095 #[tokio::test]
1096 async fn invite_create_fetch_and_unique_token() {
1097 let store = store().await;
1098 let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
1099
1100 assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
1101 assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
1102 }
1103
1104 #[tokio::test]
1105 async fn channel_membership_add_remove_and_list() {
1106 let store = store().await;
1107 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1108 assert!(store.is_channel_member("ops", "aaron").await.unwrap());
1110
1111 store.add_channel_member("ops", "david").await.unwrap();
1112 store.add_channel_member("ops", "david").await.unwrap();
1114 assert!(store.is_channel_member("ops", "david").await.unwrap());
1115
1116 let mut members = store.list_channel_members("ops").await.unwrap();
1117 members.sort();
1118 assert_eq!(members, vec!["aaron".to_owned(), "david".to_owned()]);
1119 assert_eq!(store.list_user_memberships("david").await.unwrap(), vec!["ops".to_owned()]);
1120
1121 store.remove_channel_member("ops", "david").await.unwrap();
1122 assert!(!store.is_channel_member("ops", "david").await.unwrap());
1123 }
1124
1125 #[tokio::test]
1126 async fn channel_memberships_follow_delete_and_rename() {
1127 let store = store().await;
1128 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1129 store.add_channel_member("ops", "david").await.unwrap();
1130
1131 store.rename_channel("ops", "operations").await.unwrap();
1133 assert!(store.is_channel_member("operations", "david").await.unwrap());
1134 assert!(!store.is_channel_member("ops", "david").await.unwrap());
1135
1136 store.delete_channel("operations").await.unwrap();
1138 assert!(store.list_channel_members("operations").await.unwrap().is_empty());
1139 }
1140
1141 #[tokio::test]
1142 async fn channel_visibility_can_be_changed() {
1143 let store = store().await;
1144 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1145
1146 store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
1147
1148 assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
1149 }
1150
1151 #[tokio::test]
1152 async fn channel_rename_moves_the_record_and_respects_uniqueness() {
1153 let store = store().await;
1154 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1155 store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
1156
1157 store.rename_channel("ops", "operations").await.unwrap();
1158 assert!(store.get_channel("ops").await.unwrap().is_none());
1159 assert!(store.get_channel("operations").await.unwrap().is_some());
1160
1161 assert!(store.rename_channel("operations", "taken").await.is_err());
1163 }
1164
1165 #[tokio::test]
1166 async fn channel_can_be_deleted_and_listed() {
1167 let store = store().await;
1168 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1169 store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
1170
1171 assert_eq!(store.list_channels().await.unwrap().len(), 2);
1172
1173 store.delete_channel("ops").await.unwrap();
1174 let remaining = store.list_channels().await.unwrap();
1175 assert_eq!(remaining.len(), 1);
1176 assert_eq!(remaining[0].name, "lobby");
1177 }
1178
1179 #[tokio::test]
1180 async fn invite_uses_can_be_decremented_and_revoked() {
1181 let store = store().await;
1182 store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
1183
1184 store.set_invite_uses("tok-123", 4).await.unwrap();
1185 assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
1186
1187 store.delete_invite("tok-123").await.unwrap();
1188 assert!(store.get_invite("tok-123").await.unwrap().is_none());
1189 }
1190
1191 #[tokio::test]
1192 async fn invites_are_dropped_when_the_channel_is_deleted() {
1193 let store = store().await;
1194 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1195 store.create_invite("ops", "tok", Some(5), None, "aaron").await.unwrap();
1196
1197 store.delete_channel("ops").await.unwrap();
1198 assert!(
1199 store.get_invite("tok").await.unwrap().is_none(),
1200 "deleting a channel must drop its invites so a future same-named channel cannot honor them"
1201 );
1202 }
1203
1204 #[tokio::test]
1205 async fn invites_follow_a_channel_rename() {
1206 let store = store().await;
1207 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1208 store.create_invite("ops", "tok", None, None, "aaron").await.unwrap();
1209
1210 store.rename_channel("ops", "operations").await.unwrap();
1211 assert_eq!(store.get_invite("tok").await.unwrap().unwrap().channel, "operations", "an invite must follow its renamed channel");
1212 }
1213
1214 #[tokio::test]
1215 async fn ban_add_remove_and_list_round_trip() {
1216 let store = store().await;
1217 store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1218
1219 store.add_ban("ops", "bob").await.unwrap();
1220 store.add_ban("ops", "bob").await.unwrap(); store.add_ban("ops", "mallory").await.unwrap();
1222
1223 let mut bans = store.list_bans().await.unwrap();
1224 bans.sort();
1225 assert_eq!(bans, vec![("ops".to_owned(), "bob".to_owned()), ("ops".to_owned(), "mallory".to_owned())]);
1226
1227 store.remove_ban("ops", "bob").await.unwrap();
1228 assert_eq!(store.list_bans().await.unwrap(), vec![("ops".to_owned(), "mallory".to_owned())]);
1229 }
1230
1231 #[tokio::test]
1232 async fn bans_follow_delete_and_rename() {
1233 let store = store().await;
1234 store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1235 store.add_ban("ops", "bob").await.unwrap();
1236
1237 store.rename_channel("ops", "operations").await.unwrap();
1239 assert_eq!(store.list_bans().await.unwrap(), vec![("operations".to_owned(), "bob".to_owned())]);
1240
1241 store.delete_channel("operations").await.unwrap();
1243 assert!(store.list_bans().await.unwrap().is_empty(), "deleting a channel must drop its bans");
1244 }
1245
1246 #[tokio::test]
1247 async fn users_can_be_listed_and_deleted() {
1248 let store = store().await;
1249 store.create_user("aaron").await.unwrap();
1250 store.create_user("david").await.unwrap();
1251
1252 assert_eq!(store.list_users().await.unwrap().len(), 2);
1253
1254 store.delete_user("david").await.unwrap();
1255 let remaining = store.list_users().await.unwrap();
1256 assert_eq!(remaining.len(), 1);
1257 assert_eq!(remaining[0].username, "aaron");
1258 }
1259}