Skip to main content

conclavelib/
store.rs

1//! Embedded `SurrealDB` store: durable config only, behind a thin per-table repository.
2//!
3//! The store is `SurrealDB` **embedded** — the official SDK with a local KV backend, so
4//! `conclave serve` stays a single self-contained binary with a data directory and no external DB
5//! process (DESIGN.md §15). [`Store::open`] uses the pure-Rust `SurrealKV` backend for persistence;
6//! [`Store::open_in_memory`] backs hermetic tests.
7//!
8//! There is no ORM: storage records are the SDK's own typed layer (`SurrealValue`), and this module
9//! maps between them and the domain types. Only durable config lives here (`user`, `machine`,
10//! `channel`, `invite`, with the uniqueness constraints from DESIGN.md §15); presence,
11//! subscriptions, permission levels, and the admin allowlist are deliberately not persisted.
12
13use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17    Surreal,
18    engine::local::{Db, Mem, SurrealKv},
19    types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24/// The single namespace / database the embedded store uses.
25const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28/// Schema definition run at open: the uniqueness constraints from DESIGN.md §15. `IF NOT EXISTS`
29/// keeps re-opening a persistent store idempotent.
30const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36DEFINE INDEX IF NOT EXISTS membership_channel_user ON membership FIELDS channel, user UNIQUE;
37DEFINE INDEX IF NOT EXISTS ban_channel_user ON ban FIELDS channel, user UNIQUE;
38";
39
40/// A registered account (`username` unique per server, DESIGN.md §15).
41#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
42pub struct UserRecord {
43    /// The account name.
44    pub username: String,
45    /// RFC 3339 creation timestamp.
46    pub created_at: String,
47}
48
49/// An enrolled machine keypair under a user (`pubkey` globally unique; `name` unique within the
50/// user, DESIGN.md §5, §15).
51#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
52pub struct MachineRecord {
53    /// The owning username.
54    pub user: String,
55    /// The machine name (unique within the user).
56    pub name: String,
57    /// The machine's public key, base64-encoded (globally unique).
58    pub pubkey: String,
59    /// RFC 3339 enrollment timestamp.
60    pub added_at: String,
61}
62
63/// A channel (`name` unique, DESIGN.md §6, §15). Membership (the ACL) is normalized into the
64/// `membership` table rather than an embedded array, so concurrent joins insert distinct records
65/// instead of contending on one row (PRD-0007 T-003).
66#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
67pub struct ChannelRecord {
68    /// The channel name.
69    pub name: String,
70    /// The visibility tier token (see [`Visibility::as_str`]).
71    pub visibility: String,
72    /// The creating (and administering) user.
73    pub created_by: String,
74    /// RFC 3339 creation timestamp.
75    pub created_at: String,
76}
77
78/// An invite token for a channel (`token` unique, DESIGN.md §6, §15).
79#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
80pub struct InviteRecord {
81    /// The channel the token grants access to.
82    pub channel: String,
83    /// The opaque token string.
84    pub token: String,
85    /// Remaining redemptions, or unlimited if absent.
86    pub uses_remaining: Option<i64>,
87    /// RFC 3339 expiry, or non-expiring if absent.
88    pub expires_at: Option<String>,
89    /// The creating user.
90    pub created_by: String,
91}
92
93// Query variable bindings (SurrealDB 3.x binds a `SurrealValue` object of variables).
94
95#[derive(SurrealValue)]
96struct ByUsername {
97    username: String,
98}
99
100#[derive(SurrealValue)]
101struct ByPubkey {
102    pubkey: String,
103}
104
105#[derive(SurrealValue)]
106struct ByUser {
107    user: String,
108}
109
110#[derive(SurrealValue)]
111struct ByName {
112    name: String,
113}
114
115#[derive(SurrealValue)]
116struct ByToken {
117    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
118    tok: String,
119}
120
121#[derive(SurrealValue)]
122struct ByUserAndName {
123    user: String,
124    name: String,
125}
126
127#[derive(SurrealValue)]
128struct SetVisibility {
129    name: String,
130    visibility: String,
131}
132
133#[derive(SurrealValue)]
134struct Rename {
135    old: String,
136    new: String,
137}
138
139#[derive(SurrealValue)]
140struct SetUses {
141    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
142    tok: String,
143    uses: i64,
144}
145
146#[derive(SurrealValue)]
147struct Membership {
148    channel: String,
149    user: String,
150}
151
152#[derive(SurrealValue)]
153struct ByChannel {
154    channel: String,
155}
156
157/// A bounded cap on optimistic-concurrency retries: high enough to clear realistic contention on a
158/// single channel record, low enough that a genuinely stuck write still surfaces (DESIGN.md §15).
159const MAX_WRITE_ATTEMPTS: usize = 64;
160
161/// Whether a `SurrealDB` error is an optimistic-concurrency write-write conflict (`SurrealKV`
162/// surfaces `TransactionWriteConflict`; `SurrealDB` maps it to `TransactionConflict`, both rendering
163/// with "conflict"). These are expected under concurrent load and must be retried per `SurrealDB`'s
164/// optimistic-concurrency contract — the loser of a same-key write re-applies its statement — rather
165/// than serialized behind an application lock (DESIGN.md §15).
166fn is_write_conflict(err: &surrealdb::Error) -> bool {
167    err.to_string().to_lowercase().contains("conflict")
168}
169
170/// The embedded store: a thin typed repository over an embedded `SurrealDB` instance. Cloning
171/// yields another handle to the same database (the inner client is a shared handle).
172#[derive(Clone)]
173pub struct Store {
174    db: Surreal<Db>,
175}
176
177impl Store {
178    /// Opens (or creates) a persistent store rooted at `path` using the `SurrealKV` backend.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the backend cannot be opened or the schema cannot be applied.
183    pub async fn open(path: &Path) -> Res<Self> {
184        let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
185        Self::init(db).await
186    }
187
188    /// Opens an ephemeral in-memory store (for tests).
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the in-memory backend cannot be initialized.
193    pub async fn open_in_memory() -> Res<Self> {
194        let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
195        Self::init(db).await
196    }
197
198    async fn init(db: Surreal<Db>) -> Res<Self> {
199        db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
200        db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
201        Ok(Self { db })
202    }
203
204    async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
205        let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
206        Ok(())
207    }
208
209    /// Creates a user, enforcing the unique-username constraint.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the username is already taken or the write fails.
214    pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
215        let record = UserRecord {
216            username: username.to_owned(),
217            created_at: now_rfc3339(),
218        };
219        self.insert("user", record.clone()).await?;
220        Ok(record)
221    }
222
223    /// Fetches a user by username.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the query fails.
228    pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
229        let mut response = self
230            .db
231            .query("SELECT * OMIT id FROM user WHERE username = $username")
232            .bind(ByUsername { username: username.to_owned() })
233            .await
234            .context("failed to query user")?;
235        let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
236        Ok(rows.into_iter().next())
237    }
238
239    /// Enrolls a machine, enforcing the globally-unique pubkey and per-user-unique name constraints.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the pubkey is already enrolled, the name collides within the user, or the
244    /// write fails.
245    pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
246        let record = MachineRecord {
247            user: user.to_owned(),
248            name: name.to_owned(),
249            pubkey: pubkey_base64.to_owned(),
250            added_at: now_rfc3339(),
251        };
252        self.insert("machine", record.clone()).await?;
253        Ok(record)
254    }
255
256    /// Fetches a machine by its base64 public key.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the query fails.
261    pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
262        let mut response = self
263            .db
264            .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
265            .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
266            .await
267            .context("failed to query machine")?;
268        let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
269        Ok(rows.into_iter().next())
270    }
271
272    /// Lists the machines enrolled under a user.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the query fails.
277    pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
278        let mut response = self
279            .db
280            .query("SELECT * OMIT id FROM machine WHERE user = $user")
281            .bind(ByUser { user: user.to_owned() })
282            .await
283            .context("failed to list machines")?;
284        response.take(0).context("failed to decode machine rows")
285    }
286
287    /// Revokes a machine by `(user, name)`.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the delete fails.
292    pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
293        self.db
294            .query("DELETE machine WHERE user = $user AND name = $name")
295            .bind(ByUserAndName {
296                user: user.to_owned(),
297                name: name.to_owned(),
298            })
299            .await
300            .context("failed to delete machine")?
301            .check()
302            .context("machine delete reported an error")?;
303        Ok(())
304    }
305
306    /// Creates a channel, enforcing the unique-name constraint.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the name is already taken or the write fails.
311    pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
312        let record = ChannelRecord {
313            name: name.to_owned(),
314            visibility: visibility.as_str().to_owned(),
315            created_by: created_by.to_owned(),
316            created_at: now_rfc3339(),
317        };
318        self.insert("channel", record.clone()).await?;
319        // The creator is the channel's first member (it also administers via `created_by`).
320        self.add_channel_member(name, created_by).await?;
321        Ok(record)
322    }
323
324    /// Fetches a channel by name.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the query fails.
329    pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
330        let mut response = self
331            .db
332            .query("SELECT * OMIT id FROM channel WHERE name = $name")
333            .bind(ByName { name: name.to_owned() })
334            .await
335            .context("failed to query channel")?;
336        let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
337        Ok(rows.into_iter().next())
338    }
339
340    /// Creates an invite token, enforcing the unique-token constraint.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the token already exists or the write fails.
345    pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
346        let record = InviteRecord {
347            channel: channel.to_owned(),
348            token: token.to_owned(),
349            uses_remaining,
350            expires_at,
351            created_by: created_by.to_owned(),
352        };
353        self.insert("invite", record.clone()).await?;
354        Ok(record)
355    }
356
357    /// Fetches an invite by token.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the query fails.
362    pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
363        let mut response = self
364            .db
365            .query("SELECT * OMIT id FROM invite WHERE token = $tok")
366            .bind(ByToken { tok: token.to_owned() })
367            .await
368            .context("failed to query invite")?;
369        let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
370        Ok(rows.into_iter().next())
371    }
372
373    /// The outstanding invites for one channel (the channel-admin audit view).
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the query fails.
378    pub async fn list_invites(&self, channel: &str) -> Res<Vec<InviteRecord>> {
379        let mut response = self
380            .db
381            .query("SELECT * OMIT id FROM invite WHERE channel = $channel")
382            .bind(ByChannel { channel: channel.to_owned() })
383            .await
384            .context("failed to list invites")?;
385        response.take(0).context("failed to decode invite rows")
386    }
387
388    /// Lists every channel; the caller applies visibility / membership gating (DESIGN.md §6).
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the query fails.
393    pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
394        let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
395        response.take(0).context("failed to decode channel rows")
396    }
397
398    /// Adds `user` to a channel's membership (its ACL), idempotently. Each membership is its own
399    /// record under the unique `(channel, user)` index, so concurrent adds of different users write
400    /// distinct keys and never contend on a shared row (PRD-0007 T-003); a conflict on the same pair
401    /// is retried per `SurrealDB`'s optimistic-concurrency contract.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the write keeps conflicting past the retry cap or otherwise fails.
406    pub async fn add_channel_member(&self, channel: &str, user: &str) -> Void {
407        for attempt in 0..MAX_WRITE_ATTEMPTS {
408            let outcome = self
409                .db
410                .query("INSERT INTO membership { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
411                .bind(Membership {
412                    channel: channel.to_owned(),
413                    user: user.to_owned(),
414                })
415                .await
416                .and_then(surrealdb::IndexedResults::check);
417            match outcome {
418                Ok(_) => return Ok(()),
419                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
420                Err(e) => return Err(anyhow::Error::new(e).context("failed to add channel member")),
421            }
422        }
423        anyhow::bail!("adding a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
424    }
425
426    /// Removes `user` from a channel's membership; idempotent (removing a non-member is a no-op).
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
431    pub async fn remove_channel_member(&self, channel: &str, user: &str) -> Void {
432        for attempt in 0..MAX_WRITE_ATTEMPTS {
433            let outcome = self
434                .db
435                .query("DELETE membership WHERE channel = $channel AND user = $user")
436                .bind(Membership {
437                    channel: channel.to_owned(),
438                    user: user.to_owned(),
439                })
440                .await
441                .and_then(surrealdb::IndexedResults::check);
442            match outcome {
443                Ok(_) => return Ok(()),
444                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
445                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove channel member")),
446            }
447        }
448        anyhow::bail!("removing a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
449    }
450
451    /// Whether `user` is a member of `channel`.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the query fails.
456    pub async fn is_channel_member(&self, channel: &str, user: &str) -> Res<bool> {
457        let mut response = self
458            .db
459            .query("SELECT VALUE user FROM membership WHERE channel = $channel AND user = $user")
460            .bind(Membership {
461                channel: channel.to_owned(),
462                user: user.to_owned(),
463            })
464            .await
465            .context("failed to query membership")?;
466        let rows: Vec<String> = response.take(0).context("failed to decode membership rows")?;
467        Ok(!rows.is_empty())
468    }
469
470    /// The channels `user` is a member of (for discovery gating, DESIGN.md §6).
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the query fails.
475    pub async fn list_user_memberships(&self, user: &str) -> Res<Vec<String>> {
476        let mut response = self
477            .db
478            .query("SELECT VALUE channel FROM membership WHERE user = $user")
479            .bind(ByUser { user: user.to_owned() })
480            .await
481            .context("failed to list user memberships")?;
482        response.take(0).context("failed to decode membership channels")
483    }
484
485    /// The members of a channel (its ACL users).
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if the query fails.
490    pub async fn list_channel_members(&self, channel: &str) -> Res<Vec<String>> {
491        let mut response = self
492            .db
493            .query("SELECT VALUE user FROM membership WHERE channel = $channel")
494            .bind(ByChannel { channel: channel.to_owned() })
495            .await
496            .context("failed to list channel members")?;
497        response.take(0).context("failed to decode membership users")
498    }
499
500    /// Records a channel ban for `user`; idempotent (banning twice is a no-op). Durable so bans
501    /// survive a server restart; the hub mirrors them in memory for its lock-guarded checks.
502    ///
503    /// # Errors
504    ///
505    /// Returns an error if the insert keeps conflicting past the retry cap or otherwise fails.
506    pub async fn add_ban(&self, channel: &str, user: &str) -> Void {
507        for attempt in 0..MAX_WRITE_ATTEMPTS {
508            let outcome = self
509                .db
510                .query("INSERT INTO ban { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
511                .bind(Membership {
512                    channel: channel.to_owned(),
513                    user: user.to_owned(),
514                })
515                .await
516                .and_then(surrealdb::IndexedResults::check);
517            match outcome {
518                Ok(_) => return Ok(()),
519                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
520                Err(e) => return Err(anyhow::Error::new(e).context("failed to add ban")),
521            }
522        }
523        anyhow::bail!("adding a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
524    }
525
526    /// Lifts a channel ban; idempotent (removing an absent ban is a no-op).
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
531    pub async fn remove_ban(&self, channel: &str, user: &str) -> Void {
532        for attempt in 0..MAX_WRITE_ATTEMPTS {
533            let outcome = self
534                .db
535                .query("DELETE ban WHERE channel = $channel AND user = $user")
536                .bind(Membership {
537                    channel: channel.to_owned(),
538                    user: user.to_owned(),
539                })
540                .await
541                .and_then(surrealdb::IndexedResults::check);
542            match outcome {
543                Ok(_) => return Ok(()),
544                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
545                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove ban")),
546            }
547        }
548        anyhow::bail!("removing a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
549    }
550
551    /// Every persisted `(channel, user)` ban, for loading the hub's in-memory view at startup.
552    ///
553    /// # Errors
554    ///
555    /// Returns an error if the query fails.
556    pub async fn list_bans(&self) -> Res<Vec<(String, String)>> {
557        let mut response = self.db.query("SELECT channel, user FROM ban").await.context("failed to list bans")?;
558        let rows: Vec<Membership> = response.take(0).context("failed to decode ban rows")?;
559        Ok(rows.into_iter().map(|row| (row.channel, row.user)).collect())
560    }
561
562    /// The users banned from one channel (the channel-admin audit view).
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if the query fails.
567    pub async fn list_channel_bans(&self, channel: &str) -> Res<Vec<String>> {
568        let mut response = self
569            .db
570            .query("SELECT VALUE user FROM ban WHERE channel = $channel")
571            .bind(ByChannel { channel: channel.to_owned() })
572            .await
573            .context("failed to list channel bans")?;
574        response.take(0).context("failed to decode ban users")
575    }
576
577    /// The names of the channels created (and administered) by `user`.
578    ///
579    /// # Errors
580    ///
581    /// Returns an error if the query fails.
582    pub async fn list_channels_created_by(&self, user: &str) -> Res<Vec<String>> {
583        let mut response = self
584            .db
585            .query("SELECT VALUE name FROM channel WHERE created_by = $user")
586            .bind(ByUser { user: user.to_owned() })
587            .await
588            .context("failed to list created channels")?;
589        response.take(0).context("failed to decode channel names")
590    }
591
592    /// Removes every membership held by `user` (used when a user is removed, DESIGN.md §7).
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if the delete fails.
597    pub async fn delete_user_memberships(&self, user: &str) -> Void {
598        self.db
599            .query("DELETE membership WHERE user = $user")
600            .bind(ByUser { user: user.to_owned() })
601            .await
602            .context("failed to delete user memberships")?
603            .check()
604            .context("user membership delete reported an error")?;
605        Ok(())
606    }
607
608    /// Changes a channel's visibility tier.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the update fails.
613    pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
614        self.db
615            .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
616            .bind(SetVisibility {
617                name: name.to_owned(),
618                visibility: visibility.as_str().to_owned(),
619            })
620            .await
621            .context("failed to update channel visibility")?
622            .check()
623            .context("channel visibility update reported an error")?;
624        Ok(())
625    }
626
627    /// Renames a channel, enforcing the unique-name constraint on the new name.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the new name is already taken or the update fails.
632    pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
633        self.db
634            .query("UPDATE channel SET name = $new WHERE name = $old")
635            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
636            .await
637            .context("failed to rename channel")?
638            .check()
639            .context("channel rename reported an error")?;
640        // Keep memberships and invites attached to the renamed channel (PRD-0007 T-004).
641        self.db
642            .query("UPDATE membership SET channel = $new WHERE channel = $old")
643            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
644            .await
645            .context("failed to migrate channel memberships")?
646            .check()
647            .context("membership rename reported an error")?;
648        self.db
649            .query("UPDATE invite SET channel = $new WHERE channel = $old")
650            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
651            .await
652            .context("failed to migrate channel invites")?
653            .check()
654            .context("invite rename reported an error")?;
655        self.db
656            .query("UPDATE ban SET channel = $new WHERE channel = $old")
657            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
658            .await
659            .context("failed to migrate channel bans")?
660            .check()
661            .context("ban rename reported an error")?;
662        Ok(())
663    }
664
665    /// Deletes a channel.
666    ///
667    /// # Errors
668    ///
669    /// Returns an error if the delete fails.
670    pub async fn delete_channel(&self, name: &str) -> Void {
671        self.db
672            .query("DELETE channel WHERE name = $name")
673            .bind(ByName { name: name.to_owned() })
674            .await
675            .context("failed to delete channel")?
676            .check()
677            .context("channel delete reported an error")?;
678        // Drop the channel's memberships and invites so a future same-named channel cannot inherit
679        // them (invite cascade — PRD-0007 T-004, finding #5).
680        self.db
681            .query("DELETE membership WHERE channel = $channel")
682            .bind(ByChannel { channel: name.to_owned() })
683            .await
684            .context("failed to delete channel memberships")?
685            .check()
686            .context("membership delete reported an error")?;
687        self.db
688            .query("DELETE invite WHERE channel = $channel")
689            .bind(ByChannel { channel: name.to_owned() })
690            .await
691            .context("failed to delete channel invites")?
692            .check()
693            .context("invite delete reported an error")?;
694        self.db
695            .query("DELETE ban WHERE channel = $channel")
696            .bind(ByChannel { channel: name.to_owned() })
697            .await
698            .context("failed to delete channel bans")?
699            .check()
700            .context("ban delete reported an error")?;
701        Ok(())
702    }
703
704    /// Sets an invite's remaining redemptions (used when redeeming a limited-use token).
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if the update fails.
709    pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
710        self.db
711            .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
712            .bind(SetUses {
713                tok: token.to_owned(),
714                uses: uses_remaining,
715            })
716            .await
717            .context("failed to update invite uses")?
718            .check()
719            .context("invite uses update reported an error")?;
720        Ok(())
721    }
722
723    /// Atomically consumes one redemption of a limited-use invite: decrements `uses_remaining` only
724    /// while it is positive, returning whether a use was claimed. The guarded single-statement update
725    /// (retried on an optimistic-concurrency conflict) makes concurrent redeemers of the last use
726    /// mutually exclusive, so a single-use token admits exactly one (PRD-0007 T-003). The caller
727    /// handles unlimited (`None`) tokens and expiry; an exhausted token is deleted.
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if the update keeps conflicting past the retry cap or otherwise fails.
732    pub async fn try_consume_invite_use(&self, token: &str) -> Res<bool> {
733        for attempt in 0..MAX_WRITE_ATTEMPTS {
734            let outcome = self
735                .db
736                .query("UPDATE invite SET uses_remaining = uses_remaining - 1 WHERE token = $tok AND uses_remaining > 0 RETURN VALUE uses_remaining")
737                .bind(ByToken { tok: token.to_owned() })
738                .await
739                .and_then(surrealdb::IndexedResults::check);
740            match outcome {
741                Ok(mut response) => {
742                    let remaining: Vec<i64> = response.take(0).context("failed to decode invite uses")?;
743                    return match remaining.into_iter().next() {
744                        // No positive-use row matched — the token was already spent (or removed).
745                        None => Ok(false),
746                        // This redemption took the last use; delete the spent token.
747                        Some(0) => {
748                            self.delete_invite(token).await?;
749                            Ok(true)
750                        }
751                        // A use was consumed with more remaining.
752                        Some(_) => Ok(true),
753                    };
754                }
755                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
756                Err(e) => return Err(anyhow::Error::new(e).context("failed to consume invite use")),
757            }
758        }
759        anyhow::bail!("consuming an invite use exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
760    }
761
762    /// Deletes an invite token (on revoke or when an exhausted token is redeemed).
763    ///
764    /// # Errors
765    ///
766    /// Returns an error if the delete fails.
767    pub async fn delete_invite(&self, token: &str) -> Void {
768        self.db
769            .query("DELETE invite WHERE token = $tok")
770            .bind(ByToken { tok: token.to_owned() })
771            .await
772            .context("failed to delete invite")?
773            .check()
774            .context("invite delete reported an error")?;
775        Ok(())
776    }
777
778    /// Lists every registered user (server-admin `user list`).
779    ///
780    /// # Errors
781    ///
782    /// Returns an error if the query fails.
783    pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
784        let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
785        response.take(0).context("failed to decode user rows")
786    }
787
788    /// Deletes a user (server-admin `user remove`); the caller also revokes the user's machines.
789    ///
790    /// # Errors
791    ///
792    /// Returns an error if the delete fails.
793    pub async fn delete_user(&self, username: &str) -> Void {
794        self.db
795            .query("DELETE user WHERE username = $username")
796            .bind(ByUsername { username: username.to_owned() })
797            .await
798            .context("failed to delete user")?
799            .check()
800            .context("user delete reported an error")?;
801        Ok(())
802    }
803}
804
805fn now_rfc3339() -> String {
806    chrono::Utc::now().to_rfc3339()
807}
808
809#[cfg(test)]
810mod tests {
811    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
812    #![allow(clippy::unwrap_used)]
813
814    use super::*;
815    use pretty_assertions::assert_eq;
816
817    async fn store() -> Store {
818        Store::open_in_memory().await.unwrap()
819    }
820
821    #[tokio::test]
822    async fn user_create_and_fetch_round_trip() {
823        let store = store().await;
824        let created = store.create_user("aaron").await.unwrap();
825
826        assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
827        assert_eq!(store.get_user("nobody").await.unwrap(), None);
828    }
829
830    #[tokio::test]
831    async fn duplicate_username_is_rejected() {
832        let store = store().await;
833        store.create_user("aaron").await.unwrap();
834
835        assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
836    }
837
838    #[tokio::test]
839    async fn machine_pubkey_is_globally_unique() {
840        let store = store().await;
841        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
842
843        // Same pubkey under a different user/name must still be rejected.
844        assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
845    }
846
847    #[tokio::test]
848    async fn machine_name_is_unique_within_a_user_but_not_across_users() {
849        let store = store().await;
850        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
851
852        // Same name, same user, different key -> rejected.
853        assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
854        // Same name under a different user -> allowed.
855        store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
856    }
857
858    #[tokio::test]
859    async fn machines_list_and_delete_for_a_user() {
860        let store = store().await;
861        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
862        store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
863
864        assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
865
866        store.delete_machine("aaron", "sno-box").await.unwrap();
867        let remaining = store.list_machines("aaron").await.unwrap();
868        assert_eq!(remaining.len(), 1);
869        assert_eq!(remaining[0].name, "workstation");
870    }
871
872    #[tokio::test]
873    async fn channel_create_fetch_and_unique_name() {
874        let store = store().await;
875        let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
876
877        assert_eq!(created.visibility, "private");
878        assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
879        assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
880    }
881
882    #[tokio::test]
883    async fn invite_create_fetch_and_unique_token() {
884        let store = store().await;
885        let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
886
887        assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
888        assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
889    }
890
891    #[tokio::test]
892    async fn channel_membership_add_remove_and_list() {
893        let store = store().await;
894        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
895        // The creator is seeded as the first member.
896        assert!(store.is_channel_member("ops", "aaron").await.unwrap());
897
898        store.add_channel_member("ops", "david").await.unwrap();
899        // Idempotent: re-adding an existing member is a no-op, not a duplicate or an error.
900        store.add_channel_member("ops", "david").await.unwrap();
901        assert!(store.is_channel_member("ops", "david").await.unwrap());
902
903        let mut members = store.list_channel_members("ops").await.unwrap();
904        members.sort();
905        assert_eq!(members, vec!["aaron".to_owned(), "david".to_owned()]);
906        assert_eq!(store.list_user_memberships("david").await.unwrap(), vec!["ops".to_owned()]);
907
908        store.remove_channel_member("ops", "david").await.unwrap();
909        assert!(!store.is_channel_member("ops", "david").await.unwrap());
910    }
911
912    #[tokio::test]
913    async fn channel_memberships_follow_delete_and_rename() {
914        let store = store().await;
915        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
916        store.add_channel_member("ops", "david").await.unwrap();
917
918        // Rename migrates memberships to the new channel name.
919        store.rename_channel("ops", "operations").await.unwrap();
920        assert!(store.is_channel_member("operations", "david").await.unwrap());
921        assert!(!store.is_channel_member("ops", "david").await.unwrap());
922
923        // Deleting a channel drops its memberships, so a future same-named channel cannot inherit them.
924        store.delete_channel("operations").await.unwrap();
925        assert!(store.list_channel_members("operations").await.unwrap().is_empty());
926    }
927
928    #[tokio::test]
929    async fn channel_visibility_can_be_changed() {
930        let store = store().await;
931        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
932
933        store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
934
935        assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
936    }
937
938    #[tokio::test]
939    async fn channel_rename_moves_the_record_and_respects_uniqueness() {
940        let store = store().await;
941        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
942        store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
943
944        store.rename_channel("ops", "operations").await.unwrap();
945        assert!(store.get_channel("ops").await.unwrap().is_none());
946        assert!(store.get_channel("operations").await.unwrap().is_some());
947
948        // Renaming onto an existing name is rejected by the unique index.
949        assert!(store.rename_channel("operations", "taken").await.is_err());
950    }
951
952    #[tokio::test]
953    async fn channel_can_be_deleted_and_listed() {
954        let store = store().await;
955        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
956        store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
957
958        assert_eq!(store.list_channels().await.unwrap().len(), 2);
959
960        store.delete_channel("ops").await.unwrap();
961        let remaining = store.list_channels().await.unwrap();
962        assert_eq!(remaining.len(), 1);
963        assert_eq!(remaining[0].name, "lobby");
964    }
965
966    #[tokio::test]
967    async fn invite_uses_can_be_decremented_and_revoked() {
968        let store = store().await;
969        store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
970
971        store.set_invite_uses("tok-123", 4).await.unwrap();
972        assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
973
974        store.delete_invite("tok-123").await.unwrap();
975        assert!(store.get_invite("tok-123").await.unwrap().is_none());
976    }
977
978    #[tokio::test]
979    async fn invites_are_dropped_when_the_channel_is_deleted() {
980        let store = store().await;
981        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
982        store.create_invite("ops", "tok", Some(5), None, "aaron").await.unwrap();
983
984        store.delete_channel("ops").await.unwrap();
985        assert!(
986            store.get_invite("tok").await.unwrap().is_none(),
987            "deleting a channel must drop its invites so a future same-named channel cannot honor them"
988        );
989    }
990
991    #[tokio::test]
992    async fn invites_follow_a_channel_rename() {
993        let store = store().await;
994        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
995        store.create_invite("ops", "tok", None, None, "aaron").await.unwrap();
996
997        store.rename_channel("ops", "operations").await.unwrap();
998        assert_eq!(store.get_invite("tok").await.unwrap().unwrap().channel, "operations", "an invite must follow its renamed channel");
999    }
1000
1001    #[tokio::test]
1002    async fn ban_add_remove_and_list_round_trip() {
1003        let store = store().await;
1004        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1005
1006        store.add_ban("ops", "bob").await.unwrap();
1007        store.add_ban("ops", "bob").await.unwrap(); // idempotent
1008        store.add_ban("ops", "mallory").await.unwrap();
1009
1010        let mut bans = store.list_bans().await.unwrap();
1011        bans.sort();
1012        assert_eq!(bans, vec![("ops".to_owned(), "bob".to_owned()), ("ops".to_owned(), "mallory".to_owned())]);
1013
1014        store.remove_ban("ops", "bob").await.unwrap();
1015        assert_eq!(store.list_bans().await.unwrap(), vec![("ops".to_owned(), "mallory".to_owned())]);
1016    }
1017
1018    #[tokio::test]
1019    async fn bans_follow_delete_and_rename() {
1020        let store = store().await;
1021        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1022        store.add_ban("ops", "bob").await.unwrap();
1023
1024        // A rename migrates the ban with the channel.
1025        store.rename_channel("ops", "operations").await.unwrap();
1026        assert_eq!(store.list_bans().await.unwrap(), vec![("operations".to_owned(), "bob".to_owned())]);
1027
1028        // A delete drops the channel's bans so a future same-named channel starts clean.
1029        store.delete_channel("operations").await.unwrap();
1030        assert!(store.list_bans().await.unwrap().is_empty(), "deleting a channel must drop its bans");
1031    }
1032
1033    #[tokio::test]
1034    async fn users_can_be_listed_and_deleted() {
1035        let store = store().await;
1036        store.create_user("aaron").await.unwrap();
1037        store.create_user("david").await.unwrap();
1038
1039        assert_eq!(store.list_users().await.unwrap().len(), 2);
1040
1041        store.delete_user("david").await.unwrap();
1042        let remaining = store.list_users().await.unwrap();
1043        assert_eq!(remaining.len(), 1);
1044        assert_eq!(remaining[0].username, "aaron");
1045    }
1046}