1use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17 Surreal,
18 engine::local::{Db, Mem, SurrealKv},
19 types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36";
37
38#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
40pub struct UserRecord {
41 pub username: String,
43 pub created_at: String,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
50pub struct MachineRecord {
51 pub user: String,
53 pub name: String,
55 pub pubkey: String,
57 pub added_at: String,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
63pub struct ChannelRecord {
64 pub name: String,
66 pub visibility: String,
68 pub acl: Vec<String>,
70 pub created_by: String,
72 pub created_at: String,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
78pub struct InviteRecord {
79 pub channel: String,
81 pub token: String,
83 pub uses_remaining: Option<i64>,
85 pub expires_at: Option<String>,
87 pub created_by: String,
89}
90
91#[derive(SurrealValue)]
94struct ByUsername {
95 username: String,
96}
97
98#[derive(SurrealValue)]
99struct ByPubkey {
100 pubkey: String,
101}
102
103#[derive(SurrealValue)]
104struct ByUser {
105 user: String,
106}
107
108#[derive(SurrealValue)]
109struct ByName {
110 name: String,
111}
112
113#[derive(SurrealValue)]
114struct ByToken {
115 tok: String,
117}
118
119#[derive(SurrealValue)]
120struct ByUserAndName {
121 user: String,
122 name: String,
123}
124
125#[derive(SurrealValue)]
126struct SetAcl {
127 name: String,
128 acl: Vec<String>,
129}
130
131#[derive(SurrealValue)]
132struct SetVisibility {
133 name: String,
134 visibility: String,
135}
136
137#[derive(SurrealValue)]
138struct Rename {
139 old: String,
140 new: String,
141}
142
143#[derive(SurrealValue)]
144struct SetUses {
145 tok: String,
147 uses: i64,
148}
149
150pub struct Store {
152 db: Surreal<Db>,
153}
154
155impl Store {
156 pub async fn open(path: &Path) -> Res<Self> {
162 let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
163 Self::init(db).await
164 }
165
166 pub async fn open_in_memory() -> Res<Self> {
172 let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
173 Self::init(db).await
174 }
175
176 async fn init(db: Surreal<Db>) -> Res<Self> {
177 db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
178 db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
179 Ok(Self { db })
180 }
181
182 async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
183 let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
184 Ok(())
185 }
186
187 pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
193 let record = UserRecord {
194 username: username.to_owned(),
195 created_at: now_rfc3339(),
196 };
197 self.insert("user", record.clone()).await?;
198 Ok(record)
199 }
200
201 pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
207 let mut response = self
208 .db
209 .query("SELECT * OMIT id FROM user WHERE username = $username")
210 .bind(ByUsername { username: username.to_owned() })
211 .await
212 .context("failed to query user")?;
213 let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
214 Ok(rows.into_iter().next())
215 }
216
217 pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
224 let record = MachineRecord {
225 user: user.to_owned(),
226 name: name.to_owned(),
227 pubkey: pubkey_base64.to_owned(),
228 added_at: now_rfc3339(),
229 };
230 self.insert("machine", record.clone()).await?;
231 Ok(record)
232 }
233
234 pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
240 let mut response = self
241 .db
242 .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
243 .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
244 .await
245 .context("failed to query machine")?;
246 let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
247 Ok(rows.into_iter().next())
248 }
249
250 pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
256 let mut response = self
257 .db
258 .query("SELECT * OMIT id FROM machine WHERE user = $user")
259 .bind(ByUser { user: user.to_owned() })
260 .await
261 .context("failed to list machines")?;
262 response.take(0).context("failed to decode machine rows")
263 }
264
265 pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
271 self.db
272 .query("DELETE machine WHERE user = $user AND name = $name")
273 .bind(ByUserAndName {
274 user: user.to_owned(),
275 name: name.to_owned(),
276 })
277 .await
278 .context("failed to delete machine")?
279 .check()
280 .context("machine delete reported an error")?;
281 Ok(())
282 }
283
284 pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
290 let record = ChannelRecord {
291 name: name.to_owned(),
292 visibility: visibility.as_str().to_owned(),
293 acl: vec![created_by.to_owned()],
294 created_by: created_by.to_owned(),
295 created_at: now_rfc3339(),
296 };
297 self.insert("channel", record.clone()).await?;
298 Ok(record)
299 }
300
301 pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
307 let mut response = self
308 .db
309 .query("SELECT * OMIT id FROM channel WHERE name = $name")
310 .bind(ByName { name: name.to_owned() })
311 .await
312 .context("failed to query channel")?;
313 let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
314 Ok(rows.into_iter().next())
315 }
316
317 pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
323 let record = InviteRecord {
324 channel: channel.to_owned(),
325 token: token.to_owned(),
326 uses_remaining,
327 expires_at,
328 created_by: created_by.to_owned(),
329 };
330 self.insert("invite", record.clone()).await?;
331 Ok(record)
332 }
333
334 pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
340 let mut response = self
341 .db
342 .query("SELECT * OMIT id FROM invite WHERE token = $tok")
343 .bind(ByToken { tok: token.to_owned() })
344 .await
345 .context("failed to query invite")?;
346 let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
347 Ok(rows.into_iter().next())
348 }
349
350 pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
356 let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
357 response.take(0).context("failed to decode channel rows")
358 }
359
360 pub async fn set_channel_acl(&self, name: &str, acl: &[String]) -> Void {
366 self.db
367 .query("UPDATE channel SET acl = $acl WHERE name = $name")
368 .bind(SetAcl { name: name.to_owned(), acl: acl.to_vec() })
369 .await
370 .context("failed to update channel acl")?
371 .check()
372 .context("channel acl update reported an error")?;
373 Ok(())
374 }
375
376 pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
382 self.db
383 .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
384 .bind(SetVisibility {
385 name: name.to_owned(),
386 visibility: visibility.as_str().to_owned(),
387 })
388 .await
389 .context("failed to update channel visibility")?
390 .check()
391 .context("channel visibility update reported an error")?;
392 Ok(())
393 }
394
395 pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
401 self.db
402 .query("UPDATE channel SET name = $new WHERE name = $old")
403 .bind(Rename { old: old.to_owned(), new: new.to_owned() })
404 .await
405 .context("failed to rename channel")?
406 .check()
407 .context("channel rename reported an error")?;
408 Ok(())
409 }
410
411 pub async fn delete_channel(&self, name: &str) -> Void {
417 self.db
418 .query("DELETE channel WHERE name = $name")
419 .bind(ByName { name: name.to_owned() })
420 .await
421 .context("failed to delete channel")?
422 .check()
423 .context("channel delete reported an error")?;
424 Ok(())
425 }
426
427 pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
433 self.db
434 .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
435 .bind(SetUses {
436 tok: token.to_owned(),
437 uses: uses_remaining,
438 })
439 .await
440 .context("failed to update invite uses")?
441 .check()
442 .context("invite uses update reported an error")?;
443 Ok(())
444 }
445
446 pub async fn delete_invite(&self, token: &str) -> Void {
452 self.db
453 .query("DELETE invite WHERE token = $tok")
454 .bind(ByToken { tok: token.to_owned() })
455 .await
456 .context("failed to delete invite")?
457 .check()
458 .context("invite delete reported an error")?;
459 Ok(())
460 }
461
462 pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
468 let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
469 response.take(0).context("failed to decode user rows")
470 }
471
472 pub async fn delete_user(&self, username: &str) -> Void {
478 self.db
479 .query("DELETE user WHERE username = $username")
480 .bind(ByUsername { username: username.to_owned() })
481 .await
482 .context("failed to delete user")?
483 .check()
484 .context("user delete reported an error")?;
485 Ok(())
486 }
487}
488
489fn now_rfc3339() -> String {
490 chrono::Utc::now().to_rfc3339()
491}
492
493#[cfg(test)]
494mod tests {
495 #![allow(clippy::unwrap_used)]
497
498 use super::*;
499 use pretty_assertions::assert_eq;
500
501 async fn store() -> Store {
502 Store::open_in_memory().await.unwrap()
503 }
504
505 #[tokio::test]
506 async fn user_create_and_fetch_round_trip() {
507 let store = store().await;
508 let created = store.create_user("aaron").await.unwrap();
509
510 assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
511 assert_eq!(store.get_user("nobody").await.unwrap(), None);
512 }
513
514 #[tokio::test]
515 async fn duplicate_username_is_rejected() {
516 let store = store().await;
517 store.create_user("aaron").await.unwrap();
518
519 assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
520 }
521
522 #[tokio::test]
523 async fn machine_pubkey_is_globally_unique() {
524 let store = store().await;
525 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
526
527 assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
529 }
530
531 #[tokio::test]
532 async fn machine_name_is_unique_within_a_user_but_not_across_users() {
533 let store = store().await;
534 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
535
536 assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
538 store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
540 }
541
542 #[tokio::test]
543 async fn machines_list_and_delete_for_a_user() {
544 let store = store().await;
545 store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
546 store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
547
548 assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
549
550 store.delete_machine("aaron", "sno-box").await.unwrap();
551 let remaining = store.list_machines("aaron").await.unwrap();
552 assert_eq!(remaining.len(), 1);
553 assert_eq!(remaining[0].name, "workstation");
554 }
555
556 #[tokio::test]
557 async fn channel_create_fetch_and_unique_name() {
558 let store = store().await;
559 let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
560
561 assert_eq!(created.visibility, "private");
562 assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
563 assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
564 }
565
566 #[tokio::test]
567 async fn invite_create_fetch_and_unique_token() {
568 let store = store().await;
569 let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
570
571 assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
572 assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
573 }
574
575 #[tokio::test]
576 async fn channel_acl_can_be_replaced() {
577 let store = store().await;
578 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
579
580 store.set_channel_acl("ops", &["aaron".to_owned(), "david".to_owned()]).await.unwrap();
581
582 assert_eq!(store.get_channel("ops").await.unwrap().unwrap().acl, vec!["aaron".to_owned(), "david".to_owned()]);
583 }
584
585 #[tokio::test]
586 async fn channel_visibility_can_be_changed() {
587 let store = store().await;
588 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
589
590 store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
591
592 assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
593 }
594
595 #[tokio::test]
596 async fn channel_rename_moves_the_record_and_respects_uniqueness() {
597 let store = store().await;
598 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
599 store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
600
601 store.rename_channel("ops", "operations").await.unwrap();
602 assert!(store.get_channel("ops").await.unwrap().is_none());
603 assert!(store.get_channel("operations").await.unwrap().is_some());
604
605 assert!(store.rename_channel("operations", "taken").await.is_err());
607 }
608
609 #[tokio::test]
610 async fn channel_can_be_deleted_and_listed() {
611 let store = store().await;
612 store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
613 store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
614
615 assert_eq!(store.list_channels().await.unwrap().len(), 2);
616
617 store.delete_channel("ops").await.unwrap();
618 let remaining = store.list_channels().await.unwrap();
619 assert_eq!(remaining.len(), 1);
620 assert_eq!(remaining[0].name, "lobby");
621 }
622
623 #[tokio::test]
624 async fn invite_uses_can_be_decremented_and_revoked() {
625 let store = store().await;
626 store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
627
628 store.set_invite_uses("tok-123", 4).await.unwrap();
629 assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
630
631 store.delete_invite("tok-123").await.unwrap();
632 assert!(store.get_invite("tok-123").await.unwrap().is_none());
633 }
634
635 #[tokio::test]
636 async fn users_can_be_listed_and_deleted() {
637 let store = store().await;
638 store.create_user("aaron").await.unwrap();
639 store.create_user("david").await.unwrap();
640
641 assert_eq!(store.list_users().await.unwrap().len(), 2);
642
643 store.delete_user("david").await.unwrap();
644 let remaining = store.list_users().await.unwrap();
645 assert_eq!(remaining.len(), 1);
646 assert_eq!(remaining[0].username, "aaron");
647 }
648}