Skip to main content

conclavelib/
store.rs

1//! Embedded `SurrealDB` store: durable config only, behind a thin per-table repository.
2//!
3//! The store is `SurrealDB` **embedded** — the official SDK with a local KV backend, so
4//! `conclave serve` stays a single self-contained binary with a data directory and no external DB
5//! process (DESIGN.md §15). [`Store::open`] uses the pure-Rust `SurrealKV` backend for persistence;
6//! [`Store::open_in_memory`] backs hermetic tests.
7//!
8//! There is no ORM: storage records are the SDK's own typed layer (`SurrealValue`), and this module
9//! maps between them and the domain types. Only durable config lives here (`user`, `machine`,
10//! `channel`, `invite`, with the uniqueness constraints from DESIGN.md §15); presence,
11//! subscriptions, permission levels, and the admin allowlist are deliberately not persisted.
12
13use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17    Surreal,
18    engine::local::{Db, Mem, SurrealKv},
19    types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24/// The single namespace / database the embedded store uses.
25const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28/// Schema definition run at open: the uniqueness constraints from DESIGN.md §15. `IF NOT EXISTS`
29/// keeps re-opening a persistent store idempotent.
30const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36DEFINE INDEX IF NOT EXISTS membership_channel_user ON membership FIELDS channel, user UNIQUE;
37DEFINE INDEX IF NOT EXISTS ban_channel_user ON ban FIELDS channel, user UNIQUE;
38DEFINE TABLE IF NOT EXISTS meta;
39";
40
41/// Server-lifetime metadata, held as the single fixed record `meta:server` (PRD-0012 T-003).
42#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
43pub struct MetaRecord {
44    /// The server's persistent random instance identifier.
45    pub instance_id: String,
46}
47
48/// A registered account (`username` unique per server, DESIGN.md §15).
49#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
50pub struct UserRecord {
51    /// The account name.
52    pub username: String,
53    /// RFC 3339 creation timestamp.
54    pub created_at: String,
55}
56
57/// An enrolled machine keypair under a user (`pubkey` globally unique; `name` unique within the
58/// user, DESIGN.md §5, §15).
59#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
60pub struct MachineRecord {
61    /// The owning username.
62    pub user: String,
63    /// The machine name (unique within the user).
64    pub name: String,
65    /// The machine's public key, base64-encoded (globally unique).
66    pub pubkey: String,
67    /// RFC 3339 enrollment timestamp.
68    pub added_at: String,
69}
70
71/// A channel (`name` unique, DESIGN.md §6, §15). Membership (the ACL) is normalized into the
72/// `membership` table rather than an embedded array, so concurrent joins insert distinct records
73/// instead of contending on one row (PRD-0007 T-003).
74#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
75pub struct ChannelRecord {
76    /// The channel name.
77    pub name: String,
78    /// The visibility tier token (see [`Visibility::as_str`]).
79    pub visibility: String,
80    /// The creating (and administering) user.
81    pub created_by: String,
82    /// RFC 3339 creation timestamp.
83    pub created_at: String,
84}
85
86/// An invite token for a channel (`token` unique, DESIGN.md §6, §15).
87#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
88pub struct InviteRecord {
89    /// The channel the token grants access to.
90    pub channel: String,
91    /// The opaque token string.
92    pub token: String,
93    /// Remaining redemptions, or unlimited if absent.
94    pub uses_remaining: Option<i64>,
95    /// RFC 3339 expiry, or non-expiring if absent.
96    pub expires_at: Option<String>,
97    /// The creating user.
98    pub created_by: String,
99}
100
101// Query variable bindings (SurrealDB 3.x binds a `SurrealValue` object of variables).
102
103#[derive(SurrealValue)]
104struct ByUsername {
105    username: String,
106}
107
108#[derive(SurrealValue)]
109struct ByPubkey {
110    pubkey: String,
111}
112
113#[derive(SurrealValue)]
114struct ByUser {
115    user: String,
116}
117
118#[derive(SurrealValue)]
119struct ByName {
120    name: String,
121}
122
123#[derive(SurrealValue)]
124struct ByToken {
125    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
126    tok: String,
127}
128
129#[derive(SurrealValue)]
130struct ByUserAndName {
131    user: String,
132    name: String,
133}
134
135#[derive(SurrealValue)]
136struct SetVisibility {
137    name: String,
138    visibility: String,
139}
140
141#[derive(SurrealValue)]
142struct Rename {
143    old: String,
144    new: String,
145}
146
147#[derive(SurrealValue)]
148struct SetUses {
149    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
150    tok: String,
151    uses: i64,
152}
153
154#[derive(SurrealValue)]
155struct Membership {
156    channel: String,
157    user: String,
158}
159
160#[derive(SurrealValue)]
161struct ByChannel {
162    channel: String,
163}
164
165/// A bounded cap on optimistic-concurrency retries: high enough to clear realistic contention on a
166/// single channel record, low enough that a genuinely stuck write still surfaces (DESIGN.md §15).
167const MAX_WRITE_ATTEMPTS: usize = 64;
168
169/// Whether a `SurrealDB` error is an optimistic-concurrency write-write conflict (`SurrealKV`
170/// surfaces `TransactionWriteConflict`; `SurrealDB` maps it to `TransactionConflict`, both rendering
171/// with "conflict"). These are expected under concurrent load and must be retried per `SurrealDB`'s
172/// optimistic-concurrency contract — the loser of a same-key write re-applies its statement — rather
173/// than serialized behind an application lock (DESIGN.md §15).
174fn is_write_conflict(err: &surrealdb::Error) -> bool {
175    err.to_string().to_lowercase().contains("conflict")
176}
177
178/// The embedded store: a thin typed repository over an embedded `SurrealDB` instance. Cloning
179/// yields another handle to the same database (the inner client is a shared handle).
180#[derive(Clone)]
181pub struct Store {
182    db: Surreal<Db>,
183}
184
185impl Store {
186    /// Opens (or creates) a persistent store rooted at `path` using the `SurrealKV` backend.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if the backend cannot be opened or the schema cannot be applied.
191    pub async fn open(path: &Path) -> Res<Self> {
192        let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
193        Self::init(db).await
194    }
195
196    /// Opens an ephemeral in-memory store (for tests).
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the in-memory backend cannot be initialized.
201    pub async fn open_in_memory() -> Res<Self> {
202        let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
203        Self::init(db).await
204    }
205
206    async fn init(db: Surreal<Db>) -> Res<Self> {
207        db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
208        db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
209        Ok(Self { db })
210    }
211
212    async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
213        let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
214        Ok(())
215    }
216
217    /// Returns the server's persistent instance ID, generating and storing one on first call.
218    ///
219    /// The ID is what lets a bridge recognize the same server reached under two different URLs
220    /// (PRD-0012 T-003); it is random, carries no meaning, and never changes for a data dir.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the read or the first-boot write fails.
225    pub async fn instance_id(&self) -> Res<String> {
226        let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to query server meta")?;
227        let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
228        if let Some(meta) = rows.into_iter().next() {
229            return Ok(meta.instance_id);
230        }
231
232        let id = crate::identity::generate_token()?;
233        // A fixed record id makes first-boot generation race-free: a concurrent second writer
234        // errors on the existing record and re-reads instead of minting a divergent ID.
235        let created: Result<Option<MetaRecord>, _> = self.db.create(("meta", "server")).content(MetaRecord { instance_id: id.clone() }).await;
236        if created.is_ok() {
237            return Ok(id);
238        }
239        let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to re-query server meta")?;
240        let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
241        rows.into_iter().next().map(|meta| meta.instance_id).context("server meta write raced but no record exists")
242    }
243
244    /// Creates a user, enforcing the unique-username constraint.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the username is already taken or the write fails.
249    pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
250        let record = UserRecord {
251            username: username.to_owned(),
252            created_at: now_rfc3339(),
253        };
254        self.insert("user", record.clone()).await?;
255        Ok(record)
256    }
257
258    /// Fetches a user by username.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the query fails.
263    pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
264        let mut response = self
265            .db
266            .query("SELECT * OMIT id FROM user WHERE username = $username")
267            .bind(ByUsername { username: username.to_owned() })
268            .await
269            .context("failed to query user")?;
270        let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
271        Ok(rows.into_iter().next())
272    }
273
274    /// Enrolls a machine, enforcing the globally-unique pubkey and per-user-unique name constraints.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the pubkey is already enrolled, the name collides within the user, or the
279    /// write fails.
280    pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
281        let record = MachineRecord {
282            user: user.to_owned(),
283            name: name.to_owned(),
284            pubkey: pubkey_base64.to_owned(),
285            added_at: now_rfc3339(),
286        };
287        self.insert("machine", record.clone()).await?;
288        Ok(record)
289    }
290
291    /// Fetches a machine by its base64 public key.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the query fails.
296    pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
297        let mut response = self
298            .db
299            .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
300            .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
301            .await
302            .context("failed to query machine")?;
303        let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
304        Ok(rows.into_iter().next())
305    }
306
307    /// Lists the machines enrolled under a user.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the query fails.
312    pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
313        let mut response = self
314            .db
315            .query("SELECT * OMIT id FROM machine WHERE user = $user")
316            .bind(ByUser { user: user.to_owned() })
317            .await
318            .context("failed to list machines")?;
319        response.take(0).context("failed to decode machine rows")
320    }
321
322    /// Revokes a machine by `(user, name)`.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the delete fails.
327    pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
328        self.db
329            .query("DELETE machine WHERE user = $user AND name = $name")
330            .bind(ByUserAndName {
331                user: user.to_owned(),
332                name: name.to_owned(),
333            })
334            .await
335            .context("failed to delete machine")?
336            .check()
337            .context("machine delete reported an error")?;
338        Ok(())
339    }
340
341    /// Creates a channel, enforcing the unique-name constraint.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the name is already taken or the write fails.
346    pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
347        let record = ChannelRecord {
348            name: name.to_owned(),
349            visibility: visibility.as_str().to_owned(),
350            created_by: created_by.to_owned(),
351            created_at: now_rfc3339(),
352        };
353        self.insert("channel", record.clone()).await?;
354        // The creator is the channel's first member (it also administers via `created_by`).
355        self.add_channel_member(name, created_by).await?;
356        Ok(record)
357    }
358
359    /// Fetches a channel by name.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if the query fails.
364    pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
365        let mut response = self
366            .db
367            .query("SELECT * OMIT id FROM channel WHERE name = $name")
368            .bind(ByName { name: name.to_owned() })
369            .await
370            .context("failed to query channel")?;
371        let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
372        Ok(rows.into_iter().next())
373    }
374
375    /// Creates an invite token, enforcing the unique-token constraint.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the token already exists or the write fails.
380    pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
381        let record = InviteRecord {
382            channel: channel.to_owned(),
383            token: token.to_owned(),
384            uses_remaining,
385            expires_at,
386            created_by: created_by.to_owned(),
387        };
388        self.insert("invite", record.clone()).await?;
389        Ok(record)
390    }
391
392    /// Fetches an invite by token.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the query fails.
397    pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
398        let mut response = self
399            .db
400            .query("SELECT * OMIT id FROM invite WHERE token = $tok")
401            .bind(ByToken { tok: token.to_owned() })
402            .await
403            .context("failed to query invite")?;
404        let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
405        Ok(rows.into_iter().next())
406    }
407
408    /// The outstanding invites for one channel (the channel-admin audit view).
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if the query fails.
413    pub async fn list_invites(&self, channel: &str) -> Res<Vec<InviteRecord>> {
414        let mut response = self
415            .db
416            .query("SELECT * OMIT id FROM invite WHERE channel = $channel")
417            .bind(ByChannel { channel: channel.to_owned() })
418            .await
419            .context("failed to list invites")?;
420        response.take(0).context("failed to decode invite rows")
421    }
422
423    /// Lists every channel; the caller applies visibility / membership gating (DESIGN.md §6).
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the query fails.
428    pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
429        let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
430        response.take(0).context("failed to decode channel rows")
431    }
432
433    /// Adds `user` to a channel's membership (its ACL), idempotently. Each membership is its own
434    /// record under the unique `(channel, user)` index, so concurrent adds of different users write
435    /// distinct keys and never contend on a shared row (PRD-0007 T-003); a conflict on the same pair
436    /// is retried per `SurrealDB`'s optimistic-concurrency contract.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the write keeps conflicting past the retry cap or otherwise fails.
441    pub async fn add_channel_member(&self, channel: &str, user: &str) -> Void {
442        for attempt in 0..MAX_WRITE_ATTEMPTS {
443            let outcome = self
444                .db
445                .query("INSERT INTO membership { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
446                .bind(Membership {
447                    channel: channel.to_owned(),
448                    user: user.to_owned(),
449                })
450                .await
451                .and_then(surrealdb::IndexedResults::check);
452            match outcome {
453                Ok(_) => return Ok(()),
454                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
455                Err(e) => return Err(anyhow::Error::new(e).context("failed to add channel member")),
456            }
457        }
458        anyhow::bail!("adding a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
459    }
460
461    /// Removes `user` from a channel's membership; idempotent (removing a non-member is a no-op).
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
466    pub async fn remove_channel_member(&self, channel: &str, user: &str) -> Void {
467        for attempt in 0..MAX_WRITE_ATTEMPTS {
468            let outcome = self
469                .db
470                .query("DELETE membership WHERE channel = $channel AND user = $user")
471                .bind(Membership {
472                    channel: channel.to_owned(),
473                    user: user.to_owned(),
474                })
475                .await
476                .and_then(surrealdb::IndexedResults::check);
477            match outcome {
478                Ok(_) => return Ok(()),
479                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
480                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove channel member")),
481            }
482        }
483        anyhow::bail!("removing a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
484    }
485
486    /// Whether `user` is a member of `channel`.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the query fails.
491    pub async fn is_channel_member(&self, channel: &str, user: &str) -> Res<bool> {
492        let mut response = self
493            .db
494            .query("SELECT VALUE user FROM membership WHERE channel = $channel AND user = $user")
495            .bind(Membership {
496                channel: channel.to_owned(),
497                user: user.to_owned(),
498            })
499            .await
500            .context("failed to query membership")?;
501        let rows: Vec<String> = response.take(0).context("failed to decode membership rows")?;
502        Ok(!rows.is_empty())
503    }
504
505    /// The channels `user` is a member of (for discovery gating, DESIGN.md §6).
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the query fails.
510    pub async fn list_user_memberships(&self, user: &str) -> Res<Vec<String>> {
511        let mut response = self
512            .db
513            .query("SELECT VALUE channel FROM membership WHERE user = $user")
514            .bind(ByUser { user: user.to_owned() })
515            .await
516            .context("failed to list user memberships")?;
517        response.take(0).context("failed to decode membership channels")
518    }
519
520    /// The members of a channel (its ACL users).
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if the query fails.
525    pub async fn list_channel_members(&self, channel: &str) -> Res<Vec<String>> {
526        let mut response = self
527            .db
528            .query("SELECT VALUE user FROM membership WHERE channel = $channel")
529            .bind(ByChannel { channel: channel.to_owned() })
530            .await
531            .context("failed to list channel members")?;
532        response.take(0).context("failed to decode membership users")
533    }
534
535    /// Records a channel ban for `user`; idempotent (banning twice is a no-op). Durable so bans
536    /// survive a server restart; the hub mirrors them in memory for its lock-guarded checks.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the insert keeps conflicting past the retry cap or otherwise fails.
541    pub async fn add_ban(&self, channel: &str, user: &str) -> Void {
542        for attempt in 0..MAX_WRITE_ATTEMPTS {
543            let outcome = self
544                .db
545                .query("INSERT INTO ban { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
546                .bind(Membership {
547                    channel: channel.to_owned(),
548                    user: user.to_owned(),
549                })
550                .await
551                .and_then(surrealdb::IndexedResults::check);
552            match outcome {
553                Ok(_) => return Ok(()),
554                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
555                Err(e) => return Err(anyhow::Error::new(e).context("failed to add ban")),
556            }
557        }
558        anyhow::bail!("adding a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
559    }
560
561    /// Lifts a channel ban; idempotent (removing an absent ban is a no-op).
562    ///
563    /// # Errors
564    ///
565    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
566    pub async fn remove_ban(&self, channel: &str, user: &str) -> Void {
567        for attempt in 0..MAX_WRITE_ATTEMPTS {
568            let outcome = self
569                .db
570                .query("DELETE ban WHERE channel = $channel AND user = $user")
571                .bind(Membership {
572                    channel: channel.to_owned(),
573                    user: user.to_owned(),
574                })
575                .await
576                .and_then(surrealdb::IndexedResults::check);
577            match outcome {
578                Ok(_) => return Ok(()),
579                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
580                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove ban")),
581            }
582        }
583        anyhow::bail!("removing a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
584    }
585
586    /// Every persisted `(channel, user)` ban, for loading the hub's in-memory view at startup.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if the query fails.
591    pub async fn list_bans(&self) -> Res<Vec<(String, String)>> {
592        let mut response = self.db.query("SELECT channel, user FROM ban").await.context("failed to list bans")?;
593        let rows: Vec<Membership> = response.take(0).context("failed to decode ban rows")?;
594        Ok(rows.into_iter().map(|row| (row.channel, row.user)).collect())
595    }
596
597    /// The users banned from one channel (the channel-admin audit view).
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the query fails.
602    pub async fn list_channel_bans(&self, channel: &str) -> Res<Vec<String>> {
603        let mut response = self
604            .db
605            .query("SELECT VALUE user FROM ban WHERE channel = $channel")
606            .bind(ByChannel { channel: channel.to_owned() })
607            .await
608            .context("failed to list channel bans")?;
609        response.take(0).context("failed to decode ban users")
610    }
611
612    /// The names of the channels created (and administered) by `user`.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if the query fails.
617    pub async fn list_channels_created_by(&self, user: &str) -> Res<Vec<String>> {
618        let mut response = self
619            .db
620            .query("SELECT VALUE name FROM channel WHERE created_by = $user")
621            .bind(ByUser { user: user.to_owned() })
622            .await
623            .context("failed to list created channels")?;
624        response.take(0).context("failed to decode channel names")
625    }
626
627    /// Removes every membership held by `user` (used when a user is removed, DESIGN.md §7).
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the delete fails.
632    pub async fn delete_user_memberships(&self, user: &str) -> Void {
633        self.db
634            .query("DELETE membership WHERE user = $user")
635            .bind(ByUser { user: user.to_owned() })
636            .await
637            .context("failed to delete user memberships")?
638            .check()
639            .context("user membership delete reported an error")?;
640        Ok(())
641    }
642
643    /// Changes a channel's visibility tier.
644    ///
645    /// # Errors
646    ///
647    /// Returns an error if the update fails.
648    pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
649        self.db
650            .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
651            .bind(SetVisibility {
652                name: name.to_owned(),
653                visibility: visibility.as_str().to_owned(),
654            })
655            .await
656            .context("failed to update channel visibility")?
657            .check()
658            .context("channel visibility update reported an error")?;
659        Ok(())
660    }
661
662    /// Renames a channel, enforcing the unique-name constraint on the new name.
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the new name is already taken or the update fails.
667    pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
668        self.db
669            .query("UPDATE channel SET name = $new WHERE name = $old")
670            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
671            .await
672            .context("failed to rename channel")?
673            .check()
674            .context("channel rename reported an error")?;
675        // Keep memberships and invites attached to the renamed channel (PRD-0007 T-004).
676        self.db
677            .query("UPDATE membership SET channel = $new WHERE channel = $old")
678            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
679            .await
680            .context("failed to migrate channel memberships")?
681            .check()
682            .context("membership rename reported an error")?;
683        self.db
684            .query("UPDATE invite SET channel = $new WHERE channel = $old")
685            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
686            .await
687            .context("failed to migrate channel invites")?
688            .check()
689            .context("invite rename reported an error")?;
690        self.db
691            .query("UPDATE ban SET channel = $new WHERE channel = $old")
692            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
693            .await
694            .context("failed to migrate channel bans")?
695            .check()
696            .context("ban rename reported an error")?;
697        Ok(())
698    }
699
700    /// Deletes a channel.
701    ///
702    /// # Errors
703    ///
704    /// Returns an error if the delete fails.
705    pub async fn delete_channel(&self, name: &str) -> Void {
706        self.db
707            .query("DELETE channel WHERE name = $name")
708            .bind(ByName { name: name.to_owned() })
709            .await
710            .context("failed to delete channel")?
711            .check()
712            .context("channel delete reported an error")?;
713        // Drop the channel's memberships and invites so a future same-named channel cannot inherit
714        // them (invite cascade — PRD-0007 T-004, finding #5).
715        self.db
716            .query("DELETE membership WHERE channel = $channel")
717            .bind(ByChannel { channel: name.to_owned() })
718            .await
719            .context("failed to delete channel memberships")?
720            .check()
721            .context("membership delete reported an error")?;
722        self.db
723            .query("DELETE invite WHERE channel = $channel")
724            .bind(ByChannel { channel: name.to_owned() })
725            .await
726            .context("failed to delete channel invites")?
727            .check()
728            .context("invite delete reported an error")?;
729        self.db
730            .query("DELETE ban WHERE channel = $channel")
731            .bind(ByChannel { channel: name.to_owned() })
732            .await
733            .context("failed to delete channel bans")?
734            .check()
735            .context("ban delete reported an error")?;
736        Ok(())
737    }
738
739    /// Sets an invite's remaining redemptions (used when redeeming a limited-use token).
740    ///
741    /// # Errors
742    ///
743    /// Returns an error if the update fails.
744    pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
745        self.db
746            .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
747            .bind(SetUses {
748                tok: token.to_owned(),
749                uses: uses_remaining,
750            })
751            .await
752            .context("failed to update invite uses")?
753            .check()
754            .context("invite uses update reported an error")?;
755        Ok(())
756    }
757
758    /// Atomically consumes one redemption of a limited-use invite: decrements `uses_remaining` only
759    /// while it is positive, returning whether a use was claimed. The guarded single-statement update
760    /// (retried on an optimistic-concurrency conflict) makes concurrent redeemers of the last use
761    /// mutually exclusive, so a single-use token admits exactly one (PRD-0007 T-003). The caller
762    /// handles unlimited (`None`) tokens and expiry; an exhausted token is deleted.
763    ///
764    /// # Errors
765    ///
766    /// Returns an error if the update keeps conflicting past the retry cap or otherwise fails.
767    pub async fn try_consume_invite_use(&self, token: &str) -> Res<bool> {
768        for attempt in 0..MAX_WRITE_ATTEMPTS {
769            let outcome = self
770                .db
771                .query("UPDATE invite SET uses_remaining = uses_remaining - 1 WHERE token = $tok AND uses_remaining > 0 RETURN VALUE uses_remaining")
772                .bind(ByToken { tok: token.to_owned() })
773                .await
774                .and_then(surrealdb::IndexedResults::check);
775            match outcome {
776                Ok(mut response) => {
777                    let remaining: Vec<i64> = response.take(0).context("failed to decode invite uses")?;
778                    return match remaining.into_iter().next() {
779                        // No positive-use row matched — the token was already spent (or removed).
780                        None => Ok(false),
781                        // This redemption took the last use; delete the spent token.
782                        Some(0) => {
783                            self.delete_invite(token).await?;
784                            Ok(true)
785                        }
786                        // A use was consumed with more remaining.
787                        Some(_) => Ok(true),
788                    };
789                }
790                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
791                Err(e) => return Err(anyhow::Error::new(e).context("failed to consume invite use")),
792            }
793        }
794        anyhow::bail!("consuming an invite use exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
795    }
796
797    /// Deletes an invite token (on revoke or when an exhausted token is redeemed).
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the delete fails.
802    pub async fn delete_invite(&self, token: &str) -> Void {
803        self.db
804            .query("DELETE invite WHERE token = $tok")
805            .bind(ByToken { tok: token.to_owned() })
806            .await
807            .context("failed to delete invite")?
808            .check()
809            .context("invite delete reported an error")?;
810        Ok(())
811    }
812
813    /// Lists every registered user (server-admin `user list`).
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the query fails.
818    pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
819        let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
820        response.take(0).context("failed to decode user rows")
821    }
822
823    /// Deletes a user (server-admin `user remove`); the caller also revokes the user's machines.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the delete fails.
828    pub async fn delete_user(&self, username: &str) -> Void {
829        self.db
830            .query("DELETE user WHERE username = $username")
831            .bind(ByUsername { username: username.to_owned() })
832            .await
833            .context("failed to delete user")?
834            .check()
835            .context("user delete reported an error")?;
836        Ok(())
837    }
838}
839
840fn now_rfc3339() -> String {
841    chrono::Utc::now().to_rfc3339()
842}
843
844#[cfg(test)]
845mod tests {
846    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
847    #![allow(clippy::unwrap_used)]
848
849    use super::*;
850    use pretty_assertions::assert_eq;
851
852    async fn store() -> Store {
853        Store::open_in_memory().await.unwrap()
854    }
855
856    #[tokio::test]
857    async fn store_instance_id_is_stable_across_reopen() {
858        let dir = tempfile::TempDir::new().unwrap();
859
860        let first = {
861            let store = Store::open(dir.path()).await.unwrap();
862            let id = store.instance_id().await.unwrap();
863            assert!(!id.is_empty());
864            // Generated once, returned verbatim thereafter.
865            assert_eq!(store.instance_id().await.unwrap(), id);
866            id
867        };
868
869        // The ID must survive a server restart — it is what lets a bridge recognize the same
870        // server behind two URLs (PRD-0012 T-003). SurrealKV releases its file lock
871        // asynchronously after drop, so the reopen polls (bounded) until the lock frees.
872        let reopened = 'reopen: {
873            for _ in 0..50 {
874                if let Ok(store) = Store::open(dir.path()).await {
875                    break 'reopen store;
876                }
877                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
878            }
879            panic!("the store never became reopenable after drop");
880        };
881        assert_eq!(reopened.instance_id().await.unwrap(), first);
882    }
883
884    #[tokio::test]
885    async fn user_create_and_fetch_round_trip() {
886        let store = store().await;
887        let created = store.create_user("aaron").await.unwrap();
888
889        assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
890        assert_eq!(store.get_user("nobody").await.unwrap(), None);
891    }
892
893    #[tokio::test]
894    async fn duplicate_username_is_rejected() {
895        let store = store().await;
896        store.create_user("aaron").await.unwrap();
897
898        assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
899    }
900
901    #[tokio::test]
902    async fn machine_pubkey_is_globally_unique() {
903        let store = store().await;
904        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
905
906        // Same pubkey under a different user/name must still be rejected.
907        assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
908    }
909
910    #[tokio::test]
911    async fn machine_name_is_unique_within_a_user_but_not_across_users() {
912        let store = store().await;
913        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
914
915        // Same name, same user, different key -> rejected.
916        assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
917        // Same name under a different user -> allowed.
918        store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
919    }
920
921    #[tokio::test]
922    async fn machines_list_and_delete_for_a_user() {
923        let store = store().await;
924        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
925        store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
926
927        assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
928
929        store.delete_machine("aaron", "sno-box").await.unwrap();
930        let remaining = store.list_machines("aaron").await.unwrap();
931        assert_eq!(remaining.len(), 1);
932        assert_eq!(remaining[0].name, "workstation");
933    }
934
935    #[tokio::test]
936    async fn channel_create_fetch_and_unique_name() {
937        let store = store().await;
938        let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
939
940        assert_eq!(created.visibility, "private");
941        assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
942        assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
943    }
944
945    #[tokio::test]
946    async fn invite_create_fetch_and_unique_token() {
947        let store = store().await;
948        let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
949
950        assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
951        assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
952    }
953
954    #[tokio::test]
955    async fn channel_membership_add_remove_and_list() {
956        let store = store().await;
957        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
958        // The creator is seeded as the first member.
959        assert!(store.is_channel_member("ops", "aaron").await.unwrap());
960
961        store.add_channel_member("ops", "david").await.unwrap();
962        // Idempotent: re-adding an existing member is a no-op, not a duplicate or an error.
963        store.add_channel_member("ops", "david").await.unwrap();
964        assert!(store.is_channel_member("ops", "david").await.unwrap());
965
966        let mut members = store.list_channel_members("ops").await.unwrap();
967        members.sort();
968        assert_eq!(members, vec!["aaron".to_owned(), "david".to_owned()]);
969        assert_eq!(store.list_user_memberships("david").await.unwrap(), vec!["ops".to_owned()]);
970
971        store.remove_channel_member("ops", "david").await.unwrap();
972        assert!(!store.is_channel_member("ops", "david").await.unwrap());
973    }
974
975    #[tokio::test]
976    async fn channel_memberships_follow_delete_and_rename() {
977        let store = store().await;
978        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
979        store.add_channel_member("ops", "david").await.unwrap();
980
981        // Rename migrates memberships to the new channel name.
982        store.rename_channel("ops", "operations").await.unwrap();
983        assert!(store.is_channel_member("operations", "david").await.unwrap());
984        assert!(!store.is_channel_member("ops", "david").await.unwrap());
985
986        // Deleting a channel drops its memberships, so a future same-named channel cannot inherit them.
987        store.delete_channel("operations").await.unwrap();
988        assert!(store.list_channel_members("operations").await.unwrap().is_empty());
989    }
990
991    #[tokio::test]
992    async fn channel_visibility_can_be_changed() {
993        let store = store().await;
994        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
995
996        store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
997
998        assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
999    }
1000
1001    #[tokio::test]
1002    async fn channel_rename_moves_the_record_and_respects_uniqueness() {
1003        let store = store().await;
1004        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1005        store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
1006
1007        store.rename_channel("ops", "operations").await.unwrap();
1008        assert!(store.get_channel("ops").await.unwrap().is_none());
1009        assert!(store.get_channel("operations").await.unwrap().is_some());
1010
1011        // Renaming onto an existing name is rejected by the unique index.
1012        assert!(store.rename_channel("operations", "taken").await.is_err());
1013    }
1014
1015    #[tokio::test]
1016    async fn channel_can_be_deleted_and_listed() {
1017        let store = store().await;
1018        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1019        store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
1020
1021        assert_eq!(store.list_channels().await.unwrap().len(), 2);
1022
1023        store.delete_channel("ops").await.unwrap();
1024        let remaining = store.list_channels().await.unwrap();
1025        assert_eq!(remaining.len(), 1);
1026        assert_eq!(remaining[0].name, "lobby");
1027    }
1028
1029    #[tokio::test]
1030    async fn invite_uses_can_be_decremented_and_revoked() {
1031        let store = store().await;
1032        store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
1033
1034        store.set_invite_uses("tok-123", 4).await.unwrap();
1035        assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
1036
1037        store.delete_invite("tok-123").await.unwrap();
1038        assert!(store.get_invite("tok-123").await.unwrap().is_none());
1039    }
1040
1041    #[tokio::test]
1042    async fn invites_are_dropped_when_the_channel_is_deleted() {
1043        let store = store().await;
1044        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1045        store.create_invite("ops", "tok", Some(5), None, "aaron").await.unwrap();
1046
1047        store.delete_channel("ops").await.unwrap();
1048        assert!(
1049            store.get_invite("tok").await.unwrap().is_none(),
1050            "deleting a channel must drop its invites so a future same-named channel cannot honor them"
1051        );
1052    }
1053
1054    #[tokio::test]
1055    async fn invites_follow_a_channel_rename() {
1056        let store = store().await;
1057        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1058        store.create_invite("ops", "tok", None, None, "aaron").await.unwrap();
1059
1060        store.rename_channel("ops", "operations").await.unwrap();
1061        assert_eq!(store.get_invite("tok").await.unwrap().unwrap().channel, "operations", "an invite must follow its renamed channel");
1062    }
1063
1064    #[tokio::test]
1065    async fn ban_add_remove_and_list_round_trip() {
1066        let store = store().await;
1067        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1068
1069        store.add_ban("ops", "bob").await.unwrap();
1070        store.add_ban("ops", "bob").await.unwrap(); // idempotent
1071        store.add_ban("ops", "mallory").await.unwrap();
1072
1073        let mut bans = store.list_bans().await.unwrap();
1074        bans.sort();
1075        assert_eq!(bans, vec![("ops".to_owned(), "bob".to_owned()), ("ops".to_owned(), "mallory".to_owned())]);
1076
1077        store.remove_ban("ops", "bob").await.unwrap();
1078        assert_eq!(store.list_bans().await.unwrap(), vec![("ops".to_owned(), "mallory".to_owned())]);
1079    }
1080
1081    #[tokio::test]
1082    async fn bans_follow_delete_and_rename() {
1083        let store = store().await;
1084        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1085        store.add_ban("ops", "bob").await.unwrap();
1086
1087        // A rename migrates the ban with the channel.
1088        store.rename_channel("ops", "operations").await.unwrap();
1089        assert_eq!(store.list_bans().await.unwrap(), vec![("operations".to_owned(), "bob".to_owned())]);
1090
1091        // A delete drops the channel's bans so a future same-named channel starts clean.
1092        store.delete_channel("operations").await.unwrap();
1093        assert!(store.list_bans().await.unwrap().is_empty(), "deleting a channel must drop its bans");
1094    }
1095
1096    #[tokio::test]
1097    async fn users_can_be_listed_and_deleted() {
1098        let store = store().await;
1099        store.create_user("aaron").await.unwrap();
1100        store.create_user("david").await.unwrap();
1101
1102        assert_eq!(store.list_users().await.unwrap().len(), 2);
1103
1104        store.delete_user("david").await.unwrap();
1105        let remaining = store.list_users().await.unwrap();
1106        assert_eq!(remaining.len(), 1);
1107        assert_eq!(remaining[0].username, "aaron");
1108    }
1109}