1use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17 Surreal,
18 engine::local::{Db, Mem, SurrealKv},
19 types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36DEFINE INDEX IF NOT EXISTS membership_channel_user ON membership FIELDS channel, user UNIQUE;
37DEFINE INDEX IF NOT EXISTS ban_channel_user ON ban FIELDS channel, user UNIQUE;
38DEFINE TABLE IF NOT EXISTS meta;
39";
40
41#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
43pub struct MetaRecord {
44 pub instance_id: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
50pub struct UserRecord {
51 pub username: String,
53 pub created_at: String,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
60pub struct MachineRecord {
61 pub user: String,
63 pub name: String,
65 pub pubkey: String,
67 pub added_at: String,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
75pub struct ChannelRecord {
76 pub name: String,
78 pub visibility: String,
80 pub created_by: String,
82 pub created_at: String,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
88pub struct InviteRecord {
89 pub channel: String,
91 pub token: String,
93 pub uses_remaining: Option<i64>,
95 pub expires_at: Option<String>,
97 pub created_by: String,
99}
100
101#[derive(SurrealValue)]
104struct ByUsername {
105 username: String,
106}
107
108#[derive(SurrealValue)]
109struct ByPubkey {
110 pubkey: String,
111}
112
113#[derive(SurrealValue)]
114struct ByUser {
115 user: String,
116}
117
118#[derive(SurrealValue)]
119struct ByName {
120 name: String,
121}
122
123#[derive(SurrealValue)]
124struct ByToken {
125 tok: String,
127}
128
129#[derive(SurrealValue)]
130struct ByUserAndName {
131 user: String,
132 name: String,
133}
134
135#[derive(SurrealValue)]
136struct SetVisibility {
137 name: String,
138 visibility: String,
139}
140
141#[derive(SurrealValue)]
142struct Rename {
143 old: String,
144 new: String,
145}
146
147#[derive(SurrealValue)]
148struct SetUses {
149 tok: String,
151 uses: i64,
152}
153
154#[derive(SurrealValue)]
155struct Membership {
156 channel: String,
157 user: String,
158}
159
160#[derive(SurrealValue)]
161struct ByChannel {
162 channel: String,
163}
164
165const MAX_WRITE_ATTEMPTS: usize = 64;
168
169fn is_write_conflict(err: &surrealdb::Error) -> bool {
175 err.to_string().to_lowercase().contains("conflict")
176}
177
178#[derive(Clone)]
181pub struct Store {
182 db: Surreal<Db>,
183}
184
185impl Store {
186 pub async fn open(path: &Path) -> Res<Self> {
192 let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
193 Self::init(db).await
194 }
195
196 pub async fn open_in_memory() -> Res<Self> {
202 let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
203 Self::init(db).await
204 }
205
206 async fn init(db: Surreal<Db>) -> Res<Self> {
207 db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
208 db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
209 Ok(Self { db })
210 }
211
212 async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
213 let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
214 Ok(())
215 }
216
217 pub async fn instance_id(&self) -> Res<String> {
226 let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to query server meta")?;
227 let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
228 if let Some(meta) = rows.into_iter().next() {
229 return Ok(meta.instance_id);
230 }
231
232 let id = crate::identity::generate_token()?;
233 let created: Result<Option<MetaRecord>, _> = self.db.create(("meta", "server")).content(MetaRecord { instance_id: id.clone() }).await;
236 if created.is_ok() {
237 return Ok(id);
238 }
239 let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to re-query server meta")?;
240 let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
241 rows.into_iter().next().map(|meta| meta.instance_id).context("server meta write raced but no record exists")
242 }
243
244 pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
250 let record = UserRecord {
251 username: username.to_owned(),
252 created_at: now_rfc3339(),
253 };
254 self.insert("user", record.clone()).await?;
255 Ok(record)
256 }
257
258 pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
264 let mut response = self
265 .db
266 .query("SELECT * OMIT id FROM user WHERE username = $username")
267 .bind(ByUsername { username: username.to_owned() })
268 .await
269 .context("failed to query user")?;
270 let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
271 Ok(rows.into_iter().next())
272 }
273
274 pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
281 let record = MachineRecord {
282 user: user.to_owned(),
283 name: name.to_owned(),
284 pubkey: pubkey_base64.to_owned(),
285 added_at: now_rfc3339(),
286 };
287 self.insert("machine", record.clone()).await?;
288 Ok(record)
289 }
290
291 pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
297 let mut response = self
298 .db
299 .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
300 .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
301 .await
302 .context("failed to query machine")?;
303 let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
304 Ok(rows.into_iter().next())
305 }
306
307 pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
313 let mut response = self
314 .db
315 .query("SELECT * OMIT id FROM machine WHERE user = $user")
316 .bind(ByUser { user: user.to_owned() })
317 .await
318 .context("failed to list machines")?;
319 response.take(0).context("failed to decode machine rows")
320 }
321
322 pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
328 self.db
329 .query("DELETE machine WHERE user = $user AND name = $name")
330 .bind(ByUserAndName {
331 user: user.to_owned(),
332 name: name.to_owned(),
333 })
334 .await
335 .context("failed to delete machine")?
336 .check()
337 .context("machine delete reported an error")?;
338 Ok(())
339 }
340
341 pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
347 let record = ChannelRecord {
348 name: name.to_owned(),
349 visibility: visibility.as_str().to_owned(),
350 created_by: created_by.to_owned(),
351 created_at: now_rfc3339(),
352 };
353 self.insert("channel", record.clone()).await?;
354 self.add_channel_member(name, created_by).await?;
356 Ok(record)
357 }
358
359 pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
365 let mut response = self
366 .db
367 .query("SELECT * OMIT id FROM channel WHERE name = $name")
368 .bind(ByName { name: name.to_owned() })
369 .await
370 .context("failed to query channel")?;
371 let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
372 Ok(rows.into_iter().next())
373 }
374
375 pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
381 let record = InviteRecord {
382 channel: channel.to_owned(),
383 token: token.to_owned(),
384 uses_remaining,
385 expires_at,
386 created_by: created_by.to_owned(),
387 };
388 self.insert("invite", record.clone()).await?;
389 Ok(record)
390 }
391
392 pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
398 let mut response = self
399 .db
400 .query("SELECT * OMIT id FROM invite WHERE token = $tok")
401 .bind(ByToken { tok: token.to_owned() })
402 .await
403 .context("failed to query invite")?;
404 let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
405 Ok(rows.into_iter().next())
406 }
407
408 pub async fn list_invites(&self, channel: &str) -> Res<Vec<InviteRecord>> {
414 let mut response = self
415 .db
416 .query("SELECT * OMIT id FROM invite WHERE channel = $channel")
417 .bind(ByChannel { channel: channel.to_owned() })
418 .await
419 .context("failed to list invites")?;
420 response.take(0).context("failed to decode invite rows")
421 }
422
423 pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
429 let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
430 response.take(0).context("failed to decode channel rows")
431 }
432
433 pub async fn add_channel_member(&self, channel: &str, user: &str) -> Void {
442 for attempt in 0..MAX_WRITE_ATTEMPTS {
443 let outcome = self
444 .db
445 .query("INSERT INTO membership { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
446 .bind(Membership {
447 channel: channel.to_owned(),
448 user: user.to_owned(),
449 })
450 .await
451 .and_then(surrealdb::IndexedResults::check);
452 match outcome {
453 Ok(_) => return Ok(()),
454 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
455 Err(e) => return Err(anyhow::Error::new(e).context("failed to add channel member")),
456 }
457 }
458 anyhow::bail!("adding a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
459 }
460
461 pub async fn remove_channel_member(&self, channel: &str, user: &str) -> Void {
467 for attempt in 0..MAX_WRITE_ATTEMPTS {
468 let outcome = self
469 .db
470 .query("DELETE membership WHERE channel = $channel AND user = $user")
471 .bind(Membership {
472 channel: channel.to_owned(),
473 user: user.to_owned(),
474 })
475 .await
476 .and_then(surrealdb::IndexedResults::check);
477 match outcome {
478 Ok(_) => return Ok(()),
479 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
480 Err(e) => return Err(anyhow::Error::new(e).context("failed to remove channel member")),
481 }
482 }
483 anyhow::bail!("removing a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
484 }
485
486 pub async fn is_channel_member(&self, channel: &str, user: &str) -> Res<bool> {
492 let mut response = self
493 .db
494 .query("SELECT VALUE user FROM membership WHERE channel = $channel AND user = $user")
495 .bind(Membership {
496 channel: channel.to_owned(),
497 user: user.to_owned(),
498 })
499 .await
500 .context("failed to query membership")?;
501 let rows: Vec<String> = response.take(0).context("failed to decode membership rows")?;
502 Ok(!rows.is_empty())
503 }
504
505 pub async fn list_user_memberships(&self, user: &str) -> Res<Vec<String>> {
511 let mut response = self
512 .db
513 .query("SELECT VALUE channel FROM membership WHERE user = $user")
514 .bind(ByUser { user: user.to_owned() })
515 .await
516 .context("failed to list user memberships")?;
517 response.take(0).context("failed to decode membership channels")
518 }
519
520 pub async fn list_channel_members(&self, channel: &str) -> Res<Vec<String>> {
526 let mut response = self
527 .db
528 .query("SELECT VALUE user FROM membership WHERE channel = $channel")
529 .bind(ByChannel { channel: channel.to_owned() })
530 .await
531 .context("failed to list channel members")?;
532 response.take(0).context("failed to decode membership users")
533 }
534
535 pub async fn add_ban(&self, channel: &str, user: &str) -> Void {
542 for attempt in 0..MAX_WRITE_ATTEMPTS {
543 let outcome = self
544 .db
545 .query("INSERT INTO ban { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
546 .bind(Membership {
547 channel: channel.to_owned(),
548 user: user.to_owned(),
549 })
550 .await
551 .and_then(surrealdb::IndexedResults::check);
552 match outcome {
553 Ok(_) => return Ok(()),
554 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
555 Err(e) => return Err(anyhow::Error::new(e).context("failed to add ban")),
556 }
557 }
558 anyhow::bail!("adding a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
559 }
560
561 pub async fn remove_ban(&self, channel: &str, user: &str) -> Void {
567 for attempt in 0..MAX_WRITE_ATTEMPTS {
568 let outcome = self
569 .db
570 .query("DELETE ban WHERE channel = $channel AND user = $user")
571 .bind(Membership {
572 channel: channel.to_owned(),
573 user: user.to_owned(),
574 })
575 .await
576 .and_then(surrealdb::IndexedResults::check);
577 match outcome {
578 Ok(_) => return Ok(()),
579 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
580 Err(e) => return Err(anyhow::Error::new(e).context("failed to remove ban")),
581 }
582 }
583 anyhow::bail!("removing a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
584 }
585
586 pub async fn list_bans(&self) -> Res<Vec<(String, String)>> {
592 let mut response = self.db.query("SELECT channel, user FROM ban").await.context("failed to list bans")?;
593 let rows: Vec<Membership> = response.take(0).context("failed to decode ban rows")?;
594 Ok(rows.into_iter().map(|row| (row.channel, row.user)).collect())
595 }
596
597 pub async fn list_channel_bans(&self, channel: &str) -> Res<Vec<String>> {
603 let mut response = self
604 .db
605 .query("SELECT VALUE user FROM ban WHERE channel = $channel")
606 .bind(ByChannel { channel: channel.to_owned() })
607 .await
608 .context("failed to list channel bans")?;
609 response.take(0).context("failed to decode ban users")
610 }
611
612 pub async fn list_channels_created_by(&self, user: &str) -> Res<Vec<String>> {
618 let mut response = self
619 .db
620 .query("SELECT VALUE name FROM channel WHERE created_by = $user")
621 .bind(ByUser { user: user.to_owned() })
622 .await
623 .context("failed to list created channels")?;
624 response.take(0).context("failed to decode channel names")
625 }
626
627 pub async fn delete_user_memberships(&self, user: &str) -> Void {
633 self.db
634 .query("DELETE membership WHERE user = $user")
635 .bind(ByUser { user: user.to_owned() })
636 .await
637 .context("failed to delete user memberships")?
638 .check()
639 .context("user membership delete reported an error")?;
640 Ok(())
641 }
642
643 pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
649 self.db
650 .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
651 .bind(SetVisibility {
652 name: name.to_owned(),
653 visibility: visibility.as_str().to_owned(),
654 })
655 .await
656 .context("failed to update channel visibility")?
657 .check()
658 .context("channel visibility update reported an error")?;
659 Ok(())
660 }
661
662 pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
668 self.db
669 .query("UPDATE channel SET name = $new WHERE name = $old")
670 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
671 .await
672 .context("failed to rename channel")?
673 .check()
674 .context("channel rename reported an error")?;
675 self.db
677 .query("UPDATE membership SET channel = $new WHERE channel = $old")
678 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
679 .await
680 .context("failed to migrate channel memberships")?
681 .check()
682 .context("membership rename reported an error")?;
683 self.db
684 .query("UPDATE invite SET channel = $new WHERE channel = $old")
685 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
686 .await
687 .context("failed to migrate channel invites")?
688 .check()
689 .context("invite rename reported an error")?;
690 self.db
691 .query("UPDATE ban SET channel = $new WHERE channel = $old")
692 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
693 .await
694 .context("failed to migrate channel bans")?
695 .check()
696 .context("ban rename reported an error")?;
697 Ok(())
698 }
699
700 pub async fn delete_channel(&self, name: &str) -> Void {
706 self.db
707 .query("DELETE channel WHERE name = $name")
708 .bind(ByName { name: name.to_owned() })
709 .await
710 .context("failed to delete channel")?
711 .check()
712 .context("channel delete reported an error")?;
713 self.db
716 .query("DELETE membership WHERE channel = $channel")
717 .bind(ByChannel { channel: name.to_owned() })
718 .await
719 .context("failed to delete channel memberships")?
720 .check()
721 .context("membership delete reported an error")?;
722 self.db
723 .query("DELETE invite WHERE channel = $channel")
724 .bind(ByChannel { channel: name.to_owned() })
725 .await
726 .context("failed to delete channel invites")?
727 .check()
728 .context("invite delete reported an error")?;
729 self.db
730 .query("DELETE ban WHERE channel = $channel")
731 .bind(ByChannel { channel: name.to_owned() })
732 .await
733 .context("failed to delete channel bans")?
734 .check()
735 .context("ban delete reported an error")?;
736 Ok(())
737 }
738
739 pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
745 self.db
746 .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
747 .bind(SetUses {
748 tok: token.to_owned(),
749 uses: uses_remaining,
750 })
751 .await
752 .context("failed to update invite uses")?
753 .check()
754 .context("invite uses update reported an error")?;
755 Ok(())
756 }
757
758 pub async fn try_consume_invite_use(&self, token: &str) -> Res<bool> {
768 for attempt in 0..MAX_WRITE_ATTEMPTS {
769 let outcome = self
770 .db
771 .query("UPDATE invite SET uses_remaining = uses_remaining - 1 WHERE token = $tok AND uses_remaining > 0 RETURN VALUE uses_remaining")
772 .bind(ByToken { tok: token.to_owned() })
773 .await
774 .and_then(surrealdb::IndexedResults::check);
775 match outcome {
776 Ok(mut response) => {
777 let remaining: Vec<i64> = response.take(0).context("failed to decode invite uses")?;
778 return match remaining.into_iter().next() {
779 None => Ok(false),
781 Some(0) => {
783 self.delete_invite(token).await?;
784 Ok(true)
785 }
786 Some(_) => Ok(true),
788 };
789 }
790 Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
791 Err(e) => return Err(anyhow::Error::new(e).context("failed to consume invite use")),
792 }
793 }
794 anyhow::bail!("consuming an invite use exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
795 }
796
797 pub async fn delete_invite(&self, token: &str) -> Void {
803 self.db
804 .query("DELETE invite WHERE token = $tok")
805 .bind(ByToken { tok: token.to_owned() })
806 .await
807 .context("failed to delete invite")?
808 .check()
809 .context("invite delete reported an error")?;
810 Ok(())
811 }
812
813 pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
819 let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
820 response.take(0).context("failed to decode user rows")
821 }
822
823 pub async fn delete_user(&self, username: &str) -> Void {
829 self.db
830 .query("DELETE user WHERE username = $username")
831 .bind(ByUsername { username: username.to_owned() })
832 .await
833 .context("failed to delete user")?
834 .check()
835 .context("user delete reported an error")?;
836 Ok(())
837 }
838}
839
840fn now_rfc3339() -> String {
841 chrono::Utc::now().to_rfc3339()
842}
843
844#[cfg(test)]
845mod tests {
846 #![allow(clippy::unwrap_used)]
848
849 use super::*;
850 use pretty_assertions::assert_eq;
851
852 async fn store() -> Store {
853 Store::open_in_memory().await.unwrap()
854 }
855
856 #[tokio::test]
857 async fn store_instance_id_is_stable_across_reopen() {
858 let dir = tempfile::TempDir::new().unwrap();
859
860 let first = {
861 let store = Store::open(dir.path()).await.unwrap();
862 let id = store.instance_id().await.unwrap();
863 assert!(!id.is_empty());
864 assert_eq!(store.instance_id().await.unwrap(), id);
866 id
867 };
868
869 let reopened = 'reopen: {
873 for _ in 0..50 {
874 if let Ok(store) = Store::open(dir.path()).await {
875 break 'reopen store;
876 }
877 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
878 }
879 panic!("the store never became reopenable after drop");
880 };
881 assert_eq!(reopened.instance_id().await.unwrap(), first);
882 }
883
884 #[tokio::test]
885 async fn user_create_and_fetch_round_trip() {
886 let store = store().await;
887 let created = store.create_user("aaron").await.unwrap();
888
889 assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
890 assert_eq!(store.get_user("nobody").await.unwrap(), None);
891 }
892
893 #[tokio::test]
894 async fn duplicate_username_is_rejected() {
895 let store = store().await;
896 store.create_user("aaron").await.unwrap();
897
898 assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
899 }
900
901 #[tokio::test]
902 async fn machine_pubkey_is_globally_unique() {
903 let store = store().await;
904 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
905
906 assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
908 }
909
910 #[tokio::test]
911 async fn machine_name_is_unique_within_a_user_but_not_across_users() {
912 let store = store().await;
913 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
914
915 assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
917 store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
919 }
920
921 #[tokio::test]
922 async fn machines_list_and_delete_for_a_user() {
923 let store = store().await;
924 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
925 store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
926
927 assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
928
929 store.delete_machine("aaron", "sno-box").await.unwrap();
930 let remaining = store.list_machines("aaron").await.unwrap();
931 assert_eq!(remaining.len(), 1);
932 assert_eq!(remaining[0].name, "workstation");
933 }
934
935 #[tokio::test]
936 async fn channel_create_fetch_and_unique_name() {
937 let store = store().await;
938 let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
939
940 assert_eq!(created.visibility, "private");
941 assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
942 assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
943 }
944
945 #[tokio::test]
946 async fn invite_create_fetch_and_unique_token() {
947 let store = store().await;
948 let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
949
950 assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
951 assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
952 }
953
954 #[tokio::test]
955 async fn channel_membership_add_remove_and_list() {
956 let store = store().await;
957 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
958 assert!(store.is_channel_member("ops", "aaron").await.unwrap());
960
961 store.add_channel_member("ops", "david").await.unwrap();
962 store.add_channel_member("ops", "david").await.unwrap();
964 assert!(store.is_channel_member("ops", "david").await.unwrap());
965
966 let mut members = store.list_channel_members("ops").await.unwrap();
967 members.sort();
968 assert_eq!(members, vec!["aaron".to_owned(), "david".to_owned()]);
969 assert_eq!(store.list_user_memberships("david").await.unwrap(), vec!["ops".to_owned()]);
970
971 store.remove_channel_member("ops", "david").await.unwrap();
972 assert!(!store.is_channel_member("ops", "david").await.unwrap());
973 }
974
975 #[tokio::test]
976 async fn channel_memberships_follow_delete_and_rename() {
977 let store = store().await;
978 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
979 store.add_channel_member("ops", "david").await.unwrap();
980
981 store.rename_channel("ops", "operations").await.unwrap();
983 assert!(store.is_channel_member("operations", "david").await.unwrap());
984 assert!(!store.is_channel_member("ops", "david").await.unwrap());
985
986 store.delete_channel("operations").await.unwrap();
988 assert!(store.list_channel_members("operations").await.unwrap().is_empty());
989 }
990
991 #[tokio::test]
992 async fn channel_visibility_can_be_changed() {
993 let store = store().await;
994 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
995
996 store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
997
998 assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
999 }
1000
1001 #[tokio::test]
1002 async fn channel_rename_moves_the_record_and_respects_uniqueness() {
1003 let store = store().await;
1004 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1005 store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
1006
1007 store.rename_channel("ops", "operations").await.unwrap();
1008 assert!(store.get_channel("ops").await.unwrap().is_none());
1009 assert!(store.get_channel("operations").await.unwrap().is_some());
1010
1011 assert!(store.rename_channel("operations", "taken").await.is_err());
1013 }
1014
1015 #[tokio::test]
1016 async fn channel_can_be_deleted_and_listed() {
1017 let store = store().await;
1018 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1019 store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
1020
1021 assert_eq!(store.list_channels().await.unwrap().len(), 2);
1022
1023 store.delete_channel("ops").await.unwrap();
1024 let remaining = store.list_channels().await.unwrap();
1025 assert_eq!(remaining.len(), 1);
1026 assert_eq!(remaining[0].name, "lobby");
1027 }
1028
1029 #[tokio::test]
1030 async fn invite_uses_can_be_decremented_and_revoked() {
1031 let store = store().await;
1032 store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
1033
1034 store.set_invite_uses("tok-123", 4).await.unwrap();
1035 assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
1036
1037 store.delete_invite("tok-123").await.unwrap();
1038 assert!(store.get_invite("tok-123").await.unwrap().is_none());
1039 }
1040
1041 #[tokio::test]
1042 async fn invites_are_dropped_when_the_channel_is_deleted() {
1043 let store = store().await;
1044 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1045 store.create_invite("ops", "tok", Some(5), None, "aaron").await.unwrap();
1046
1047 store.delete_channel("ops").await.unwrap();
1048 assert!(
1049 store.get_invite("tok").await.unwrap().is_none(),
1050 "deleting a channel must drop its invites so a future same-named channel cannot honor them"
1051 );
1052 }
1053
1054 #[tokio::test]
1055 async fn invites_follow_a_channel_rename() {
1056 let store = store().await;
1057 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1058 store.create_invite("ops", "tok", None, None, "aaron").await.unwrap();
1059
1060 store.rename_channel("ops", "operations").await.unwrap();
1061 assert_eq!(store.get_invite("tok").await.unwrap().unwrap().channel, "operations", "an invite must follow its renamed channel");
1062 }
1063
1064 #[tokio::test]
1065 async fn ban_add_remove_and_list_round_trip() {
1066 let store = store().await;
1067 store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1068
1069 store.add_ban("ops", "bob").await.unwrap();
1070 store.add_ban("ops", "bob").await.unwrap(); store.add_ban("ops", "mallory").await.unwrap();
1072
1073 let mut bans = store.list_bans().await.unwrap();
1074 bans.sort();
1075 assert_eq!(bans, vec![("ops".to_owned(), "bob".to_owned()), ("ops".to_owned(), "mallory".to_owned())]);
1076
1077 store.remove_ban("ops", "bob").await.unwrap();
1078 assert_eq!(store.list_bans().await.unwrap(), vec![("ops".to_owned(), "mallory".to_owned())]);
1079 }
1080
1081 #[tokio::test]
1082 async fn bans_follow_delete_and_rename() {
1083 let store = store().await;
1084 store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1085 store.add_ban("ops", "bob").await.unwrap();
1086
1087 store.rename_channel("ops", "operations").await.unwrap();
1089 assert_eq!(store.list_bans().await.unwrap(), vec![("operations".to_owned(), "bob".to_owned())]);
1090
1091 store.delete_channel("operations").await.unwrap();
1093 assert!(store.list_bans().await.unwrap().is_empty(), "deleting a channel must drop its bans");
1094 }
1095
1096 #[tokio::test]
1097 async fn users_can_be_listed_and_deleted() {
1098 let store = store().await;
1099 store.create_user("aaron").await.unwrap();
1100 store.create_user("david").await.unwrap();
1101
1102 assert_eq!(store.list_users().await.unwrap().len(), 2);
1103
1104 store.delete_user("david").await.unwrap();
1105 let remaining = store.list_users().await.unwrap();
1106 assert_eq!(remaining.len(), 1);
1107 assert_eq!(remaining[0].username, "aaron");
1108 }
1109}