Skip to main content

conclavelib/
store.rs

1//! Embedded `SurrealDB` store: durable config only, behind a thin per-table repository.
2//!
3//! The store is `SurrealDB` **embedded** — the official SDK with a local KV backend, so
4//! `conclave serve` stays a single self-contained binary with a data directory and no external DB
5//! process (DESIGN.md §15). [`Store::open`] uses the pure-Rust `SurrealKV` backend for persistence;
6//! [`Store::open_in_memory`] backs hermetic tests.
7//!
8//! There is no ORM: storage records are the SDK's own typed layer (`SurrealValue`), and this module
9//! maps between them and the domain types. Only durable config lives here (`user`, `machine`,
10//! `channel`, `invite`, with the uniqueness constraints from DESIGN.md §15); presence,
11//! subscriptions, permission levels, and the admin allowlist are deliberately not persisted.
12
13use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17    Surreal,
18    engine::local::{Db, Mem, SurrealKv},
19    types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24/// The single namespace / database the embedded store uses.
25const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28/// Schema definition run at open: the uniqueness constraints from DESIGN.md §15. `IF NOT EXISTS`
29/// keeps re-opening a persistent store idempotent.
30const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36";
37
38/// A registered account (`username` unique per server, DESIGN.md §15).
39#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
40pub struct UserRecord {
41    /// The account name.
42    pub username: String,
43    /// RFC 3339 creation timestamp.
44    pub created_at: String,
45}
46
47/// An enrolled machine keypair under a user (`pubkey` globally unique; `name` unique within the
48/// user, DESIGN.md §5, §15).
49#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
50pub struct MachineRecord {
51    /// The owning username.
52    pub user: String,
53    /// The machine name (unique within the user).
54    pub name: String,
55    /// The machine's public key, base64-encoded (globally unique).
56    pub pubkey: String,
57    /// RFC 3339 enrollment timestamp.
58    pub added_at: String,
59}
60
61/// A channel (`name` unique, DESIGN.md §6, §15).
62#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
63pub struct ChannelRecord {
64    /// The channel name.
65    pub name: String,
66    /// The visibility tier token (see [`Visibility::as_str`]).
67    pub visibility: String,
68    /// The user-level access-control list.
69    pub acl: Vec<String>,
70    /// The creating (and administering) user.
71    pub created_by: String,
72    /// RFC 3339 creation timestamp.
73    pub created_at: String,
74}
75
76/// An invite token for a channel (`token` unique, DESIGN.md §6, §15).
77#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
78pub struct InviteRecord {
79    /// The channel the token grants access to.
80    pub channel: String,
81    /// The opaque token string.
82    pub token: String,
83    /// Remaining redemptions, or unlimited if absent.
84    pub uses_remaining: Option<i64>,
85    /// RFC 3339 expiry, or non-expiring if absent.
86    pub expires_at: Option<String>,
87    /// The creating user.
88    pub created_by: String,
89}
90
91// Query variable bindings (SurrealDB 3.x binds a `SurrealValue` object of variables).
92
93#[derive(SurrealValue)]
94struct ByUsername {
95    username: String,
96}
97
98#[derive(SurrealValue)]
99struct ByPubkey {
100    pubkey: String,
101}
102
103#[derive(SurrealValue)]
104struct ByUser {
105    user: String,
106}
107
108#[derive(SurrealValue)]
109struct ByName {
110    name: String,
111}
112
113#[derive(SurrealValue)]
114struct ByToken {
115    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
116    tok: String,
117}
118
119#[derive(SurrealValue)]
120struct ByUserAndName {
121    user: String,
122    name: String,
123}
124
125#[derive(SurrealValue)]
126struct SetAcl {
127    name: String,
128    acl: Vec<String>,
129}
130
131#[derive(SurrealValue)]
132struct SetVisibility {
133    name: String,
134    visibility: String,
135}
136
137#[derive(SurrealValue)]
138struct Rename {
139    old: String,
140    new: String,
141}
142
143#[derive(SurrealValue)]
144struct SetUses {
145    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
146    tok: String,
147    uses: i64,
148}
149
150/// The embedded store: a thin typed repository over an embedded `SurrealDB` instance.
151pub struct Store {
152    db: Surreal<Db>,
153}
154
155impl Store {
156    /// Opens (or creates) a persistent store rooted at `path` using the `SurrealKV` backend.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the backend cannot be opened or the schema cannot be applied.
161    pub async fn open(path: &Path) -> Res<Self> {
162        let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
163        Self::init(db).await
164    }
165
166    /// Opens an ephemeral in-memory store (for tests).
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the in-memory backend cannot be initialized.
171    pub async fn open_in_memory() -> Res<Self> {
172        let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
173        Self::init(db).await
174    }
175
176    async fn init(db: Surreal<Db>) -> Res<Self> {
177        db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
178        db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
179        Ok(Self { db })
180    }
181
182    async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
183        let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
184        Ok(())
185    }
186
187    /// Creates a user, enforcing the unique-username constraint.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the username is already taken or the write fails.
192    pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
193        let record = UserRecord {
194            username: username.to_owned(),
195            created_at: now_rfc3339(),
196        };
197        self.insert("user", record.clone()).await?;
198        Ok(record)
199    }
200
201    /// Fetches a user by username.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the query fails.
206    pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
207        let mut response = self
208            .db
209            .query("SELECT * OMIT id FROM user WHERE username = $username")
210            .bind(ByUsername { username: username.to_owned() })
211            .await
212            .context("failed to query user")?;
213        let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
214        Ok(rows.into_iter().next())
215    }
216
217    /// Enrolls a machine, enforcing the globally-unique pubkey and per-user-unique name constraints.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the pubkey is already enrolled, the name collides within the user, or the
222    /// write fails.
223    pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
224        let record = MachineRecord {
225            user: user.to_owned(),
226            name: name.to_owned(),
227            pubkey: pubkey_base64.to_owned(),
228            added_at: now_rfc3339(),
229        };
230        self.insert("machine", record.clone()).await?;
231        Ok(record)
232    }
233
234    /// Fetches a machine by its base64 public key.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the query fails.
239    pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
240        let mut response = self
241            .db
242            .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
243            .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
244            .await
245            .context("failed to query machine")?;
246        let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
247        Ok(rows.into_iter().next())
248    }
249
250    /// Lists the machines enrolled under a user.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the query fails.
255    pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
256        let mut response = self
257            .db
258            .query("SELECT * OMIT id FROM machine WHERE user = $user")
259            .bind(ByUser { user: user.to_owned() })
260            .await
261            .context("failed to list machines")?;
262        response.take(0).context("failed to decode machine rows")
263    }
264
265    /// Revokes a machine by `(user, name)`.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the delete fails.
270    pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
271        self.db
272            .query("DELETE machine WHERE user = $user AND name = $name")
273            .bind(ByUserAndName {
274                user: user.to_owned(),
275                name: name.to_owned(),
276            })
277            .await
278            .context("failed to delete machine")?
279            .check()
280            .context("machine delete reported an error")?;
281        Ok(())
282    }
283
284    /// Creates a channel, enforcing the unique-name constraint.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the name is already taken or the write fails.
289    pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
290        let record = ChannelRecord {
291            name: name.to_owned(),
292            visibility: visibility.as_str().to_owned(),
293            acl: vec![created_by.to_owned()],
294            created_by: created_by.to_owned(),
295            created_at: now_rfc3339(),
296        };
297        self.insert("channel", record.clone()).await?;
298        Ok(record)
299    }
300
301    /// Fetches a channel by name.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the query fails.
306    pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
307        let mut response = self
308            .db
309            .query("SELECT * OMIT id FROM channel WHERE name = $name")
310            .bind(ByName { name: name.to_owned() })
311            .await
312            .context("failed to query channel")?;
313        let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
314        Ok(rows.into_iter().next())
315    }
316
317    /// Creates an invite token, enforcing the unique-token constraint.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the token already exists or the write fails.
322    pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
323        let record = InviteRecord {
324            channel: channel.to_owned(),
325            token: token.to_owned(),
326            uses_remaining,
327            expires_at,
328            created_by: created_by.to_owned(),
329        };
330        self.insert("invite", record.clone()).await?;
331        Ok(record)
332    }
333
334    /// Fetches an invite by token.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the query fails.
339    pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
340        let mut response = self
341            .db
342            .query("SELECT * OMIT id FROM invite WHERE token = $tok")
343            .bind(ByToken { tok: token.to_owned() })
344            .await
345            .context("failed to query invite")?;
346        let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
347        Ok(rows.into_iter().next())
348    }
349
350    /// Lists every channel; the caller applies visibility / membership gating (DESIGN.md §6).
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the query fails.
355    pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
356        let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
357        response.take(0).context("failed to decode channel rows")
358    }
359
360    /// Replaces a channel's access-control list (e.g. after an ACL add / invite redeem).
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the update fails.
365    pub async fn set_channel_acl(&self, name: &str, acl: &[String]) -> Void {
366        self.db
367            .query("UPDATE channel SET acl = $acl WHERE name = $name")
368            .bind(SetAcl { name: name.to_owned(), acl: acl.to_vec() })
369            .await
370            .context("failed to update channel acl")?
371            .check()
372            .context("channel acl update reported an error")?;
373        Ok(())
374    }
375
376    /// Changes a channel's visibility tier.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if the update fails.
381    pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
382        self.db
383            .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
384            .bind(SetVisibility {
385                name: name.to_owned(),
386                visibility: visibility.as_str().to_owned(),
387            })
388            .await
389            .context("failed to update channel visibility")?
390            .check()
391            .context("channel visibility update reported an error")?;
392        Ok(())
393    }
394
395    /// Renames a channel, enforcing the unique-name constraint on the new name.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the new name is already taken or the update fails.
400    pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
401        self.db
402            .query("UPDATE channel SET name = $new WHERE name = $old")
403            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
404            .await
405            .context("failed to rename channel")?
406            .check()
407            .context("channel rename reported an error")?;
408        Ok(())
409    }
410
411    /// Deletes a channel.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if the delete fails.
416    pub async fn delete_channel(&self, name: &str) -> Void {
417        self.db
418            .query("DELETE channel WHERE name = $name")
419            .bind(ByName { name: name.to_owned() })
420            .await
421            .context("failed to delete channel")?
422            .check()
423            .context("channel delete reported an error")?;
424        Ok(())
425    }
426
427    /// Sets an invite's remaining redemptions (used when redeeming a limited-use token).
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the update fails.
432    pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
433        self.db
434            .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
435            .bind(SetUses {
436                tok: token.to_owned(),
437                uses: uses_remaining,
438            })
439            .await
440            .context("failed to update invite uses")?
441            .check()
442            .context("invite uses update reported an error")?;
443        Ok(())
444    }
445
446    /// Deletes an invite token (on revoke or when an exhausted token is redeemed).
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the delete fails.
451    pub async fn delete_invite(&self, token: &str) -> Void {
452        self.db
453            .query("DELETE invite WHERE token = $tok")
454            .bind(ByToken { tok: token.to_owned() })
455            .await
456            .context("failed to delete invite")?
457            .check()
458            .context("invite delete reported an error")?;
459        Ok(())
460    }
461
462    /// Lists every registered user (server-admin `user list`).
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the query fails.
467    pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
468        let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
469        response.take(0).context("failed to decode user rows")
470    }
471
472    /// Deletes a user (server-admin `user remove`); the caller also revokes the user's machines.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error if the delete fails.
477    pub async fn delete_user(&self, username: &str) -> Void {
478        self.db
479            .query("DELETE user WHERE username = $username")
480            .bind(ByUsername { username: username.to_owned() })
481            .await
482            .context("failed to delete user")?
483            .check()
484            .context("user delete reported an error")?;
485        Ok(())
486    }
487}
488
489fn now_rfc3339() -> String {
490    chrono::Utc::now().to_rfc3339()
491}
492
493#[cfg(test)]
494mod tests {
495    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
496    #![allow(clippy::unwrap_used)]
497
498    use super::*;
499    use pretty_assertions::assert_eq;
500
501    async fn store() -> Store {
502        Store::open_in_memory().await.unwrap()
503    }
504
505    #[tokio::test]
506    async fn user_create_and_fetch_round_trip() {
507        let store = store().await;
508        let created = store.create_user("aaron").await.unwrap();
509
510        assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
511        assert_eq!(store.get_user("nobody").await.unwrap(), None);
512    }
513
514    #[tokio::test]
515    async fn duplicate_username_is_rejected() {
516        let store = store().await;
517        store.create_user("aaron").await.unwrap();
518
519        assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
520    }
521
522    #[tokio::test]
523    async fn machine_pubkey_is_globally_unique() {
524        let store = store().await;
525        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
526
527        // Same pubkey under a different user/name must still be rejected.
528        assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
529    }
530
531    #[tokio::test]
532    async fn machine_name_is_unique_within_a_user_but_not_across_users() {
533        let store = store().await;
534        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
535
536        // Same name, same user, different key -> rejected.
537        assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
538        // Same name under a different user -> allowed.
539        store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
540    }
541
542    #[tokio::test]
543    async fn machines_list_and_delete_for_a_user() {
544        let store = store().await;
545        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
546        store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
547
548        assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
549
550        store.delete_machine("aaron", "sno-box").await.unwrap();
551        let remaining = store.list_machines("aaron").await.unwrap();
552        assert_eq!(remaining.len(), 1);
553        assert_eq!(remaining[0].name, "workstation");
554    }
555
556    #[tokio::test]
557    async fn channel_create_fetch_and_unique_name() {
558        let store = store().await;
559        let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
560
561        assert_eq!(created.visibility, "private");
562        assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
563        assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
564    }
565
566    #[tokio::test]
567    async fn invite_create_fetch_and_unique_token() {
568        let store = store().await;
569        let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
570
571        assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
572        assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
573    }
574
575    #[tokio::test]
576    async fn channel_acl_can_be_replaced() {
577        let store = store().await;
578        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
579
580        store.set_channel_acl("ops", &["aaron".to_owned(), "david".to_owned()]).await.unwrap();
581
582        assert_eq!(store.get_channel("ops").await.unwrap().unwrap().acl, vec!["aaron".to_owned(), "david".to_owned()]);
583    }
584
585    #[tokio::test]
586    async fn channel_visibility_can_be_changed() {
587        let store = store().await;
588        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
589
590        store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
591
592        assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
593    }
594
595    #[tokio::test]
596    async fn channel_rename_moves_the_record_and_respects_uniqueness() {
597        let store = store().await;
598        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
599        store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
600
601        store.rename_channel("ops", "operations").await.unwrap();
602        assert!(store.get_channel("ops").await.unwrap().is_none());
603        assert!(store.get_channel("operations").await.unwrap().is_some());
604
605        // Renaming onto an existing name is rejected by the unique index.
606        assert!(store.rename_channel("operations", "taken").await.is_err());
607    }
608
609    #[tokio::test]
610    async fn channel_can_be_deleted_and_listed() {
611        let store = store().await;
612        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
613        store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
614
615        assert_eq!(store.list_channels().await.unwrap().len(), 2);
616
617        store.delete_channel("ops").await.unwrap();
618        let remaining = store.list_channels().await.unwrap();
619        assert_eq!(remaining.len(), 1);
620        assert_eq!(remaining[0].name, "lobby");
621    }
622
623    #[tokio::test]
624    async fn invite_uses_can_be_decremented_and_revoked() {
625        let store = store().await;
626        store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
627
628        store.set_invite_uses("tok-123", 4).await.unwrap();
629        assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
630
631        store.delete_invite("tok-123").await.unwrap();
632        assert!(store.get_invite("tok-123").await.unwrap().is_none());
633    }
634
635    #[tokio::test]
636    async fn users_can_be_listed_and_deleted() {
637        let store = store().await;
638        store.create_user("aaron").await.unwrap();
639        store.create_user("david").await.unwrap();
640
641        assert_eq!(store.list_users().await.unwrap().len(), 2);
642
643        store.delete_user("david").await.unwrap();
644        let remaining = store.list_users().await.unwrap();
645        assert_eq!(remaining.len(), 1);
646        assert_eq!(remaining[0].username, "aaron");
647    }
648}