Skip to main content

conclavelib/
store.rs

1//! Embedded `SurrealDB` store: durable config only, behind a thin per-table repository.
2//!
3//! The store is `SurrealDB` **embedded** — the official SDK with a local KV backend, so
4//! `conclave serve` stays a single self-contained binary with a data directory and no external DB
5//! process (DESIGN.md §15). [`Store::open`] uses the pure-Rust `SurrealKV` backend for persistence;
6//! [`Store::open_in_memory`] backs hermetic tests.
7//!
8//! There is no ORM: storage records are the SDK's own typed layer (`SurrealValue`), and this module
9//! maps between them and the domain types. Only durable config lives here (`user`, `machine`,
10//! `channel`, `invite`, with the uniqueness constraints from DESIGN.md §15); presence,
11//! subscriptions, permission levels, and the admin allowlist are deliberately not persisted.
12
13use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17    Surreal,
18    engine::local::{Db, Mem, SurrealKv},
19    types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24/// The single namespace / database the embedded store uses.
25const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28/// Schema definition run at open: the uniqueness constraints from DESIGN.md §15. `IF NOT EXISTS`
29/// keeps re-opening a persistent store idempotent.
30const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36DEFINE INDEX IF NOT EXISTS membership_channel_user ON membership FIELDS channel, user UNIQUE;
37DEFINE INDEX IF NOT EXISTS ban_channel_user ON ban FIELDS channel, user UNIQUE;
38";
39
40/// A registered account (`username` unique per server, DESIGN.md §15).
41#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
42pub struct UserRecord {
43    /// The account name.
44    pub username: String,
45    /// RFC 3339 creation timestamp.
46    pub created_at: String,
47}
48
49/// An enrolled machine keypair under a user (`pubkey` globally unique; `name` unique within the
50/// user, DESIGN.md §5, §15).
51#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
52pub struct MachineRecord {
53    /// The owning username.
54    pub user: String,
55    /// The machine name (unique within the user).
56    pub name: String,
57    /// The machine's public key, base64-encoded (globally unique).
58    pub pubkey: String,
59    /// RFC 3339 enrollment timestamp.
60    pub added_at: String,
61}
62
63/// A channel (`name` unique, DESIGN.md §6, §15). Membership (the ACL) is normalized into the
64/// `membership` table rather than an embedded array, so concurrent joins insert distinct records
65/// instead of contending on one row (PRD-0007 T-003).
66#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
67pub struct ChannelRecord {
68    /// The channel name.
69    pub name: String,
70    /// The visibility tier token (see [`Visibility::as_str`]).
71    pub visibility: String,
72    /// The creating (and administering) user.
73    pub created_by: String,
74    /// RFC 3339 creation timestamp.
75    pub created_at: String,
76}
77
78/// An invite token for a channel (`token` unique, DESIGN.md §6, §15).
79#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
80pub struct InviteRecord {
81    /// The channel the token grants access to.
82    pub channel: String,
83    /// The opaque token string.
84    pub token: String,
85    /// Remaining redemptions, or unlimited if absent.
86    pub uses_remaining: Option<i64>,
87    /// RFC 3339 expiry, or non-expiring if absent.
88    pub expires_at: Option<String>,
89    /// The creating user.
90    pub created_by: String,
91}
92
93// Query variable bindings (SurrealDB 3.x binds a `SurrealValue` object of variables).
94
95#[derive(SurrealValue)]
96struct ByUsername {
97    username: String,
98}
99
100#[derive(SurrealValue)]
101struct ByPubkey {
102    pubkey: String,
103}
104
105#[derive(SurrealValue)]
106struct ByUser {
107    user: String,
108}
109
110#[derive(SurrealValue)]
111struct ByName {
112    name: String,
113}
114
115#[derive(SurrealValue)]
116struct ByToken {
117    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
118    tok: String,
119}
120
121#[derive(SurrealValue)]
122struct ByUserAndName {
123    user: String,
124    name: String,
125}
126
127#[derive(SurrealValue)]
128struct SetVisibility {
129    name: String,
130    visibility: String,
131}
132
133#[derive(SurrealValue)]
134struct Rename {
135    old: String,
136    new: String,
137}
138
139#[derive(SurrealValue)]
140struct SetUses {
141    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
142    tok: String,
143    uses: i64,
144}
145
146#[derive(SurrealValue)]
147struct Membership {
148    channel: String,
149    user: String,
150}
151
152#[derive(SurrealValue)]
153struct ByChannel {
154    channel: String,
155}
156
157/// A bounded cap on optimistic-concurrency retries: high enough to clear realistic contention on a
158/// single channel record, low enough that a genuinely stuck write still surfaces (DESIGN.md §15).
159const MAX_WRITE_ATTEMPTS: usize = 64;
160
161/// Whether a `SurrealDB` error is an optimistic-concurrency write-write conflict (`SurrealKV`
162/// surfaces `TransactionWriteConflict`; `SurrealDB` maps it to `TransactionConflict`, both rendering
163/// with "conflict"). These are expected under concurrent load and must be retried per `SurrealDB`'s
164/// optimistic-concurrency contract — the loser of a same-key write re-applies its statement — rather
165/// than serialized behind an application lock (DESIGN.md §15).
166fn is_write_conflict(err: &surrealdb::Error) -> bool {
167    err.to_string().to_lowercase().contains("conflict")
168}
169
170/// The embedded store: a thin typed repository over an embedded `SurrealDB` instance. Cloning
171/// yields another handle to the same database (the inner client is a shared handle).
172#[derive(Clone)]
173pub struct Store {
174    db: Surreal<Db>,
175}
176
177impl Store {
178    /// Opens (or creates) a persistent store rooted at `path` using the `SurrealKV` backend.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the backend cannot be opened or the schema cannot be applied.
183    pub async fn open(path: &Path) -> Res<Self> {
184        let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
185        Self::init(db).await
186    }
187
188    /// Opens an ephemeral in-memory store (for tests).
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the in-memory backend cannot be initialized.
193    pub async fn open_in_memory() -> Res<Self> {
194        let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
195        Self::init(db).await
196    }
197
198    async fn init(db: Surreal<Db>) -> Res<Self> {
199        db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
200        db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
201        Ok(Self { db })
202    }
203
204    async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
205        let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
206        Ok(())
207    }
208
209    /// Creates a user, enforcing the unique-username constraint.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the username is already taken or the write fails.
214    pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
215        let record = UserRecord {
216            username: username.to_owned(),
217            created_at: now_rfc3339(),
218        };
219        self.insert("user", record.clone()).await?;
220        Ok(record)
221    }
222
223    /// Fetches a user by username.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the query fails.
228    pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
229        let mut response = self
230            .db
231            .query("SELECT * OMIT id FROM user WHERE username = $username")
232            .bind(ByUsername { username: username.to_owned() })
233            .await
234            .context("failed to query user")?;
235        let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
236        Ok(rows.into_iter().next())
237    }
238
239    /// Enrolls a machine, enforcing the globally-unique pubkey and per-user-unique name constraints.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the pubkey is already enrolled, the name collides within the user, or the
244    /// write fails.
245    pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
246        let record = MachineRecord {
247            user: user.to_owned(),
248            name: name.to_owned(),
249            pubkey: pubkey_base64.to_owned(),
250            added_at: now_rfc3339(),
251        };
252        self.insert("machine", record.clone()).await?;
253        Ok(record)
254    }
255
256    /// Fetches a machine by its base64 public key.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the query fails.
261    pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
262        let mut response = self
263            .db
264            .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
265            .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
266            .await
267            .context("failed to query machine")?;
268        let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
269        Ok(rows.into_iter().next())
270    }
271
272    /// Lists the machines enrolled under a user.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the query fails.
277    pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
278        let mut response = self
279            .db
280            .query("SELECT * OMIT id FROM machine WHERE user = $user")
281            .bind(ByUser { user: user.to_owned() })
282            .await
283            .context("failed to list machines")?;
284        response.take(0).context("failed to decode machine rows")
285    }
286
287    /// Revokes a machine by `(user, name)`.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the delete fails.
292    pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
293        self.db
294            .query("DELETE machine WHERE user = $user AND name = $name")
295            .bind(ByUserAndName {
296                user: user.to_owned(),
297                name: name.to_owned(),
298            })
299            .await
300            .context("failed to delete machine")?
301            .check()
302            .context("machine delete reported an error")?;
303        Ok(())
304    }
305
306    /// Creates a channel, enforcing the unique-name constraint.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the name is already taken or the write fails.
311    pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
312        let record = ChannelRecord {
313            name: name.to_owned(),
314            visibility: visibility.as_str().to_owned(),
315            created_by: created_by.to_owned(),
316            created_at: now_rfc3339(),
317        };
318        self.insert("channel", record.clone()).await?;
319        // The creator is the channel's first member (it also administers via `created_by`).
320        self.add_channel_member(name, created_by).await?;
321        Ok(record)
322    }
323
324    /// Fetches a channel by name.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the query fails.
329    pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
330        let mut response = self
331            .db
332            .query("SELECT * OMIT id FROM channel WHERE name = $name")
333            .bind(ByName { name: name.to_owned() })
334            .await
335            .context("failed to query channel")?;
336        let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
337        Ok(rows.into_iter().next())
338    }
339
340    /// Creates an invite token, enforcing the unique-token constraint.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the token already exists or the write fails.
345    pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
346        let record = InviteRecord {
347            channel: channel.to_owned(),
348            token: token.to_owned(),
349            uses_remaining,
350            expires_at,
351            created_by: created_by.to_owned(),
352        };
353        self.insert("invite", record.clone()).await?;
354        Ok(record)
355    }
356
357    /// Fetches an invite by token.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the query fails.
362    pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
363        let mut response = self
364            .db
365            .query("SELECT * OMIT id FROM invite WHERE token = $tok")
366            .bind(ByToken { tok: token.to_owned() })
367            .await
368            .context("failed to query invite")?;
369        let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
370        Ok(rows.into_iter().next())
371    }
372
373    /// Lists every channel; the caller applies visibility / membership gating (DESIGN.md §6).
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the query fails.
378    pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
379        let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
380        response.take(0).context("failed to decode channel rows")
381    }
382
383    /// Adds `user` to a channel's membership (its ACL), idempotently. Each membership is its own
384    /// record under the unique `(channel, user)` index, so concurrent adds of different users write
385    /// distinct keys and never contend on a shared row (PRD-0007 T-003); a conflict on the same pair
386    /// is retried per `SurrealDB`'s optimistic-concurrency contract.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if the write keeps conflicting past the retry cap or otherwise fails.
391    pub async fn add_channel_member(&self, channel: &str, user: &str) -> Void {
392        for attempt in 0..MAX_WRITE_ATTEMPTS {
393            let outcome = self
394                .db
395                .query("INSERT INTO membership { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
396                .bind(Membership {
397                    channel: channel.to_owned(),
398                    user: user.to_owned(),
399                })
400                .await
401                .and_then(surrealdb::IndexedResults::check);
402            match outcome {
403                Ok(_) => return Ok(()),
404                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
405                Err(e) => return Err(anyhow::Error::new(e).context("failed to add channel member")),
406            }
407        }
408        anyhow::bail!("adding a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
409    }
410
411    /// Removes `user` from a channel's membership; idempotent (removing a non-member is a no-op).
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
416    pub async fn remove_channel_member(&self, channel: &str, user: &str) -> Void {
417        for attempt in 0..MAX_WRITE_ATTEMPTS {
418            let outcome = self
419                .db
420                .query("DELETE membership WHERE channel = $channel AND user = $user")
421                .bind(Membership {
422                    channel: channel.to_owned(),
423                    user: user.to_owned(),
424                })
425                .await
426                .and_then(surrealdb::IndexedResults::check);
427            match outcome {
428                Ok(_) => return Ok(()),
429                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
430                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove channel member")),
431            }
432        }
433        anyhow::bail!("removing a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
434    }
435
436    /// Whether `user` is a member of `channel`.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the query fails.
441    pub async fn is_channel_member(&self, channel: &str, user: &str) -> Res<bool> {
442        let mut response = self
443            .db
444            .query("SELECT VALUE user FROM membership WHERE channel = $channel AND user = $user")
445            .bind(Membership {
446                channel: channel.to_owned(),
447                user: user.to_owned(),
448            })
449            .await
450            .context("failed to query membership")?;
451        let rows: Vec<String> = response.take(0).context("failed to decode membership rows")?;
452        Ok(!rows.is_empty())
453    }
454
455    /// The channels `user` is a member of (for discovery gating, DESIGN.md §6).
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the query fails.
460    pub async fn list_user_memberships(&self, user: &str) -> Res<Vec<String>> {
461        let mut response = self
462            .db
463            .query("SELECT VALUE channel FROM membership WHERE user = $user")
464            .bind(ByUser { user: user.to_owned() })
465            .await
466            .context("failed to list user memberships")?;
467        response.take(0).context("failed to decode membership channels")
468    }
469
470    /// The members of a channel (its ACL users).
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the query fails.
475    pub async fn list_channel_members(&self, channel: &str) -> Res<Vec<String>> {
476        let mut response = self
477            .db
478            .query("SELECT VALUE user FROM membership WHERE channel = $channel")
479            .bind(ByChannel { channel: channel.to_owned() })
480            .await
481            .context("failed to list channel members")?;
482        response.take(0).context("failed to decode membership users")
483    }
484
485    /// Records a channel ban for `user`; idempotent (banning twice is a no-op). Durable so bans
486    /// survive a server restart; the hub mirrors them in memory for its lock-guarded checks.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the insert keeps conflicting past the retry cap or otherwise fails.
491    pub async fn add_ban(&self, channel: &str, user: &str) -> Void {
492        for attempt in 0..MAX_WRITE_ATTEMPTS {
493            let outcome = self
494                .db
495                .query("INSERT INTO ban { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
496                .bind(Membership {
497                    channel: channel.to_owned(),
498                    user: user.to_owned(),
499                })
500                .await
501                .and_then(surrealdb::IndexedResults::check);
502            match outcome {
503                Ok(_) => return Ok(()),
504                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
505                Err(e) => return Err(anyhow::Error::new(e).context("failed to add ban")),
506            }
507        }
508        anyhow::bail!("adding a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
509    }
510
511    /// Lifts a channel ban; idempotent (removing an absent ban is a no-op).
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
516    pub async fn remove_ban(&self, channel: &str, user: &str) -> Void {
517        for attempt in 0..MAX_WRITE_ATTEMPTS {
518            let outcome = self
519                .db
520                .query("DELETE ban WHERE channel = $channel AND user = $user")
521                .bind(Membership {
522                    channel: channel.to_owned(),
523                    user: user.to_owned(),
524                })
525                .await
526                .and_then(surrealdb::IndexedResults::check);
527            match outcome {
528                Ok(_) => return Ok(()),
529                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
530                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove ban")),
531            }
532        }
533        anyhow::bail!("removing a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
534    }
535
536    /// Every persisted `(channel, user)` ban, for loading the hub's in-memory view at startup.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the query fails.
541    pub async fn list_bans(&self) -> Res<Vec<(String, String)>> {
542        let mut response = self.db.query("SELECT channel, user FROM ban").await.context("failed to list bans")?;
543        let rows: Vec<Membership> = response.take(0).context("failed to decode ban rows")?;
544        Ok(rows.into_iter().map(|row| (row.channel, row.user)).collect())
545    }
546
547    /// The names of the channels created (and administered) by `user`.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the query fails.
552    pub async fn list_channels_created_by(&self, user: &str) -> Res<Vec<String>> {
553        let mut response = self
554            .db
555            .query("SELECT VALUE name FROM channel WHERE created_by = $user")
556            .bind(ByUser { user: user.to_owned() })
557            .await
558            .context("failed to list created channels")?;
559        response.take(0).context("failed to decode channel names")
560    }
561
562    /// Removes every membership held by `user` (used when a user is removed, DESIGN.md §7).
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if the delete fails.
567    pub async fn delete_user_memberships(&self, user: &str) -> Void {
568        self.db
569            .query("DELETE membership WHERE user = $user")
570            .bind(ByUser { user: user.to_owned() })
571            .await
572            .context("failed to delete user memberships")?
573            .check()
574            .context("user membership delete reported an error")?;
575        Ok(())
576    }
577
578    /// Changes a channel's visibility tier.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if the update fails.
583    pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
584        self.db
585            .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
586            .bind(SetVisibility {
587                name: name.to_owned(),
588                visibility: visibility.as_str().to_owned(),
589            })
590            .await
591            .context("failed to update channel visibility")?
592            .check()
593            .context("channel visibility update reported an error")?;
594        Ok(())
595    }
596
597    /// Renames a channel, enforcing the unique-name constraint on the new name.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the new name is already taken or the update fails.
602    pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
603        self.db
604            .query("UPDATE channel SET name = $new WHERE name = $old")
605            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
606            .await
607            .context("failed to rename channel")?
608            .check()
609            .context("channel rename reported an error")?;
610        // Keep memberships and invites attached to the renamed channel (PRD-0007 T-004).
611        self.db
612            .query("UPDATE membership SET channel = $new WHERE channel = $old")
613            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
614            .await
615            .context("failed to migrate channel memberships")?
616            .check()
617            .context("membership rename reported an error")?;
618        self.db
619            .query("UPDATE invite SET channel = $new WHERE channel = $old")
620            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
621            .await
622            .context("failed to migrate channel invites")?
623            .check()
624            .context("invite rename reported an error")?;
625        self.db
626            .query("UPDATE ban SET channel = $new WHERE channel = $old")
627            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
628            .await
629            .context("failed to migrate channel bans")?
630            .check()
631            .context("ban rename reported an error")?;
632        Ok(())
633    }
634
635    /// Deletes a channel.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if the delete fails.
640    pub async fn delete_channel(&self, name: &str) -> Void {
641        self.db
642            .query("DELETE channel WHERE name = $name")
643            .bind(ByName { name: name.to_owned() })
644            .await
645            .context("failed to delete channel")?
646            .check()
647            .context("channel delete reported an error")?;
648        // Drop the channel's memberships and invites so a future same-named channel cannot inherit
649        // them (invite cascade — PRD-0007 T-004, finding #5).
650        self.db
651            .query("DELETE membership WHERE channel = $channel")
652            .bind(ByChannel { channel: name.to_owned() })
653            .await
654            .context("failed to delete channel memberships")?
655            .check()
656            .context("membership delete reported an error")?;
657        self.db
658            .query("DELETE invite WHERE channel = $channel")
659            .bind(ByChannel { channel: name.to_owned() })
660            .await
661            .context("failed to delete channel invites")?
662            .check()
663            .context("invite delete reported an error")?;
664        self.db
665            .query("DELETE ban WHERE channel = $channel")
666            .bind(ByChannel { channel: name.to_owned() })
667            .await
668            .context("failed to delete channel bans")?
669            .check()
670            .context("ban delete reported an error")?;
671        Ok(())
672    }
673
674    /// Sets an invite's remaining redemptions (used when redeeming a limited-use token).
675    ///
676    /// # Errors
677    ///
678    /// Returns an error if the update fails.
679    pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
680        self.db
681            .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
682            .bind(SetUses {
683                tok: token.to_owned(),
684                uses: uses_remaining,
685            })
686            .await
687            .context("failed to update invite uses")?
688            .check()
689            .context("invite uses update reported an error")?;
690        Ok(())
691    }
692
693    /// Atomically consumes one redemption of a limited-use invite: decrements `uses_remaining` only
694    /// while it is positive, returning whether a use was claimed. The guarded single-statement update
695    /// (retried on an optimistic-concurrency conflict) makes concurrent redeemers of the last use
696    /// mutually exclusive, so a single-use token admits exactly one (PRD-0007 T-003). The caller
697    /// handles unlimited (`None`) tokens and expiry; an exhausted token is deleted.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if the update keeps conflicting past the retry cap or otherwise fails.
702    pub async fn try_consume_invite_use(&self, token: &str) -> Res<bool> {
703        for attempt in 0..MAX_WRITE_ATTEMPTS {
704            let outcome = self
705                .db
706                .query("UPDATE invite SET uses_remaining = uses_remaining - 1 WHERE token = $tok AND uses_remaining > 0 RETURN VALUE uses_remaining")
707                .bind(ByToken { tok: token.to_owned() })
708                .await
709                .and_then(surrealdb::IndexedResults::check);
710            match outcome {
711                Ok(mut response) => {
712                    let remaining: Vec<i64> = response.take(0).context("failed to decode invite uses")?;
713                    return match remaining.into_iter().next() {
714                        // No positive-use row matched — the token was already spent (or removed).
715                        None => Ok(false),
716                        // This redemption took the last use; delete the spent token.
717                        Some(0) => {
718                            self.delete_invite(token).await?;
719                            Ok(true)
720                        }
721                        // A use was consumed with more remaining.
722                        Some(_) => Ok(true),
723                    };
724                }
725                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
726                Err(e) => return Err(anyhow::Error::new(e).context("failed to consume invite use")),
727            }
728        }
729        anyhow::bail!("consuming an invite use exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
730    }
731
732    /// Deletes an invite token (on revoke or when an exhausted token is redeemed).
733    ///
734    /// # Errors
735    ///
736    /// Returns an error if the delete fails.
737    pub async fn delete_invite(&self, token: &str) -> Void {
738        self.db
739            .query("DELETE invite WHERE token = $tok")
740            .bind(ByToken { tok: token.to_owned() })
741            .await
742            .context("failed to delete invite")?
743            .check()
744            .context("invite delete reported an error")?;
745        Ok(())
746    }
747
748    /// Lists every registered user (server-admin `user list`).
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if the query fails.
753    pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
754        let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
755        response.take(0).context("failed to decode user rows")
756    }
757
758    /// Deletes a user (server-admin `user remove`); the caller also revokes the user's machines.
759    ///
760    /// # Errors
761    ///
762    /// Returns an error if the delete fails.
763    pub async fn delete_user(&self, username: &str) -> Void {
764        self.db
765            .query("DELETE user WHERE username = $username")
766            .bind(ByUsername { username: username.to_owned() })
767            .await
768            .context("failed to delete user")?
769            .check()
770            .context("user delete reported an error")?;
771        Ok(())
772    }
773}
774
775fn now_rfc3339() -> String {
776    chrono::Utc::now().to_rfc3339()
777}
778
779#[cfg(test)]
780mod tests {
781    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
782    #![allow(clippy::unwrap_used)]
783
784    use super::*;
785    use pretty_assertions::assert_eq;
786
787    async fn store() -> Store {
788        Store::open_in_memory().await.unwrap()
789    }
790
791    #[tokio::test]
792    async fn user_create_and_fetch_round_trip() {
793        let store = store().await;
794        let created = store.create_user("aaron").await.unwrap();
795
796        assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
797        assert_eq!(store.get_user("nobody").await.unwrap(), None);
798    }
799
800    #[tokio::test]
801    async fn duplicate_username_is_rejected() {
802        let store = store().await;
803        store.create_user("aaron").await.unwrap();
804
805        assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
806    }
807
808    #[tokio::test]
809    async fn machine_pubkey_is_globally_unique() {
810        let store = store().await;
811        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
812
813        // Same pubkey under a different user/name must still be rejected.
814        assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
815    }
816
817    #[tokio::test]
818    async fn machine_name_is_unique_within_a_user_but_not_across_users() {
819        let store = store().await;
820        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
821
822        // Same name, same user, different key -> rejected.
823        assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
824        // Same name under a different user -> allowed.
825        store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
826    }
827
828    #[tokio::test]
829    async fn machines_list_and_delete_for_a_user() {
830        let store = store().await;
831        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
832        store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
833
834        assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
835
836        store.delete_machine("aaron", "sno-box").await.unwrap();
837        let remaining = store.list_machines("aaron").await.unwrap();
838        assert_eq!(remaining.len(), 1);
839        assert_eq!(remaining[0].name, "workstation");
840    }
841
842    #[tokio::test]
843    async fn channel_create_fetch_and_unique_name() {
844        let store = store().await;
845        let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
846
847        assert_eq!(created.visibility, "private");
848        assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
849        assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
850    }
851
852    #[tokio::test]
853    async fn invite_create_fetch_and_unique_token() {
854        let store = store().await;
855        let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
856
857        assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
858        assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
859    }
860
861    #[tokio::test]
862    async fn channel_membership_add_remove_and_list() {
863        let store = store().await;
864        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
865        // The creator is seeded as the first member.
866        assert!(store.is_channel_member("ops", "aaron").await.unwrap());
867
868        store.add_channel_member("ops", "david").await.unwrap();
869        // Idempotent: re-adding an existing member is a no-op, not a duplicate or an error.
870        store.add_channel_member("ops", "david").await.unwrap();
871        assert!(store.is_channel_member("ops", "david").await.unwrap());
872
873        let mut members = store.list_channel_members("ops").await.unwrap();
874        members.sort();
875        assert_eq!(members, vec!["aaron".to_owned(), "david".to_owned()]);
876        assert_eq!(store.list_user_memberships("david").await.unwrap(), vec!["ops".to_owned()]);
877
878        store.remove_channel_member("ops", "david").await.unwrap();
879        assert!(!store.is_channel_member("ops", "david").await.unwrap());
880    }
881
882    #[tokio::test]
883    async fn channel_memberships_follow_delete_and_rename() {
884        let store = store().await;
885        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
886        store.add_channel_member("ops", "david").await.unwrap();
887
888        // Rename migrates memberships to the new channel name.
889        store.rename_channel("ops", "operations").await.unwrap();
890        assert!(store.is_channel_member("operations", "david").await.unwrap());
891        assert!(!store.is_channel_member("ops", "david").await.unwrap());
892
893        // Deleting a channel drops its memberships, so a future same-named channel cannot inherit them.
894        store.delete_channel("operations").await.unwrap();
895        assert!(store.list_channel_members("operations").await.unwrap().is_empty());
896    }
897
898    #[tokio::test]
899    async fn channel_visibility_can_be_changed() {
900        let store = store().await;
901        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
902
903        store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
904
905        assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
906    }
907
908    #[tokio::test]
909    async fn channel_rename_moves_the_record_and_respects_uniqueness() {
910        let store = store().await;
911        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
912        store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
913
914        store.rename_channel("ops", "operations").await.unwrap();
915        assert!(store.get_channel("ops").await.unwrap().is_none());
916        assert!(store.get_channel("operations").await.unwrap().is_some());
917
918        // Renaming onto an existing name is rejected by the unique index.
919        assert!(store.rename_channel("operations", "taken").await.is_err());
920    }
921
922    #[tokio::test]
923    async fn channel_can_be_deleted_and_listed() {
924        let store = store().await;
925        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
926        store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
927
928        assert_eq!(store.list_channels().await.unwrap().len(), 2);
929
930        store.delete_channel("ops").await.unwrap();
931        let remaining = store.list_channels().await.unwrap();
932        assert_eq!(remaining.len(), 1);
933        assert_eq!(remaining[0].name, "lobby");
934    }
935
936    #[tokio::test]
937    async fn invite_uses_can_be_decremented_and_revoked() {
938        let store = store().await;
939        store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
940
941        store.set_invite_uses("tok-123", 4).await.unwrap();
942        assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
943
944        store.delete_invite("tok-123").await.unwrap();
945        assert!(store.get_invite("tok-123").await.unwrap().is_none());
946    }
947
948    #[tokio::test]
949    async fn invites_are_dropped_when_the_channel_is_deleted() {
950        let store = store().await;
951        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
952        store.create_invite("ops", "tok", Some(5), None, "aaron").await.unwrap();
953
954        store.delete_channel("ops").await.unwrap();
955        assert!(
956            store.get_invite("tok").await.unwrap().is_none(),
957            "deleting a channel must drop its invites so a future same-named channel cannot honor them"
958        );
959    }
960
961    #[tokio::test]
962    async fn invites_follow_a_channel_rename() {
963        let store = store().await;
964        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
965        store.create_invite("ops", "tok", None, None, "aaron").await.unwrap();
966
967        store.rename_channel("ops", "operations").await.unwrap();
968        assert_eq!(store.get_invite("tok").await.unwrap().unwrap().channel, "operations", "an invite must follow its renamed channel");
969    }
970
971    #[tokio::test]
972    async fn ban_add_remove_and_list_round_trip() {
973        let store = store().await;
974        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
975
976        store.add_ban("ops", "bob").await.unwrap();
977        store.add_ban("ops", "bob").await.unwrap(); // idempotent
978        store.add_ban("ops", "mallory").await.unwrap();
979
980        let mut bans = store.list_bans().await.unwrap();
981        bans.sort();
982        assert_eq!(bans, vec![("ops".to_owned(), "bob".to_owned()), ("ops".to_owned(), "mallory".to_owned())]);
983
984        store.remove_ban("ops", "bob").await.unwrap();
985        assert_eq!(store.list_bans().await.unwrap(), vec![("ops".to_owned(), "mallory".to_owned())]);
986    }
987
988    #[tokio::test]
989    async fn bans_follow_delete_and_rename() {
990        let store = store().await;
991        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
992        store.add_ban("ops", "bob").await.unwrap();
993
994        // A rename migrates the ban with the channel.
995        store.rename_channel("ops", "operations").await.unwrap();
996        assert_eq!(store.list_bans().await.unwrap(), vec![("operations".to_owned(), "bob".to_owned())]);
997
998        // A delete drops the channel's bans so a future same-named channel starts clean.
999        store.delete_channel("operations").await.unwrap();
1000        assert!(store.list_bans().await.unwrap().is_empty(), "deleting a channel must drop its bans");
1001    }
1002
1003    #[tokio::test]
1004    async fn users_can_be_listed_and_deleted() {
1005        let store = store().await;
1006        store.create_user("aaron").await.unwrap();
1007        store.create_user("david").await.unwrap();
1008
1009        assert_eq!(store.list_users().await.unwrap().len(), 2);
1010
1011        store.delete_user("david").await.unwrap();
1012        let remaining = store.list_users().await.unwrap();
1013        assert_eq!(remaining.len(), 1);
1014        assert_eq!(remaining[0].username, "aaron");
1015    }
1016}