Skip to main content

conclavelib/
store.rs

1//! Embedded `SurrealDB` store: durable config only, behind a thin per-table repository.
2//!
3//! The store is `SurrealDB` **embedded** — the official SDK with a local KV backend, so
4//! `conclave serve` stays a single self-contained binary with a data directory and no external DB
5//! process (DESIGN.md §15). [`Store::open`] uses the pure-Rust `SurrealKV` backend for persistence;
6//! [`Store::open_in_memory`] backs hermetic tests.
7//!
8//! There is no ORM: storage records are the SDK's own typed layer (`SurrealValue`), and this module
9//! maps between them and the domain types. Only durable config lives here (`user`, `machine`,
10//! `channel`, `invite`, with the uniqueness constraints from DESIGN.md §15); presence,
11//! subscriptions, permission levels, and the admin allowlist are deliberately not persisted.
12
13use std::path::Path;
14
15use anyhow::Context as _;
16use surrealdb::{
17    Surreal,
18    engine::local::{Db, Mem, SurrealKv},
19    types::{SurrealValue, Value},
20};
21
22use crate::base::{Res, Visibility, Void};
23
24/// The single namespace / database the embedded store uses.
25const NAMESPACE: &str = "conclave";
26const DATABASE: &str = "conclave";
27
28/// Schema definition run at open: the uniqueness constraints from DESIGN.md §15. `IF NOT EXISTS`
29/// keeps re-opening a persistent store idempotent.
30const SCHEMA: &str = "\
31DEFINE INDEX IF NOT EXISTS user_username ON user FIELDS username UNIQUE;
32DEFINE INDEX IF NOT EXISTS machine_pubkey ON machine FIELDS pubkey UNIQUE;
33DEFINE INDEX IF NOT EXISTS machine_user_name ON machine FIELDS user, name UNIQUE;
34DEFINE INDEX IF NOT EXISTS channel_name ON channel FIELDS name UNIQUE;
35DEFINE INDEX IF NOT EXISTS invite_token ON invite FIELDS token UNIQUE;
36DEFINE INDEX IF NOT EXISTS membership_channel_user ON membership FIELDS channel, user UNIQUE;
37DEFINE INDEX IF NOT EXISTS ban_channel_user ON ban FIELDS channel, user UNIQUE;
38DEFINE TABLE IF NOT EXISTS meta;
39DEFINE TABLE IF NOT EXISTS message;
40DEFINE INDEX IF NOT EXISTS message_channel_ts ON message FIELDS channel, ts_ms;
41";
42
43/// One retained channel message (PRD-0013 T-001). The payload is the wire envelope stored
44/// verbatim as opaque bytes, so E2E ciphertext (PRD-0010) is retained without being readable.
45#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
46pub struct MessageRecord {
47    /// The channel the message was posted to.
48    pub channel: String,
49    /// The sender's full `user/machine/session` path.
50    pub from: String,
51    /// The bincode-encoded [`Payload`](crate::protocol::Payload) envelope, verbatim.
52    pub payload: Vec<u8>,
53    /// Server-stamped receive time, epoch milliseconds (the read-since watermark unit).
54    pub ts_ms: i64,
55}
56
57/// Server-lifetime metadata, held as the single fixed record `meta:server` (PRD-0012 T-003).
58#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
59pub struct MetaRecord {
60    /// The server's persistent random instance identifier.
61    pub instance_id: String,
62}
63
64/// A registered account (`username` unique per server, DESIGN.md §15).
65#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
66pub struct UserRecord {
67    /// The account name.
68    pub username: String,
69    /// RFC 3339 creation timestamp.
70    pub created_at: String,
71}
72
73/// An enrolled machine keypair under a user (`pubkey` globally unique; `name` unique within the
74/// user, DESIGN.md §5, §15).
75#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
76pub struct MachineRecord {
77    /// The owning username.
78    pub user: String,
79    /// The machine name (unique within the user).
80    pub name: String,
81    /// The machine's public key, base64-encoded (globally unique).
82    pub pubkey: String,
83    /// RFC 3339 enrollment timestamp.
84    pub added_at: String,
85}
86
87/// A channel (`name` unique, DESIGN.md §6, §15). Membership (the ACL) is normalized into the
88/// `membership` table rather than an embedded array, so concurrent joins insert distinct records
89/// instead of contending on one row (PRD-0007 T-003).
90#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
91pub struct ChannelRecord {
92    /// The channel name.
93    pub name: String,
94    /// The visibility tier token (see [`Visibility::as_str`]).
95    pub visibility: String,
96    /// The creating (and administering) user.
97    pub created_by: String,
98    /// RFC 3339 creation timestamp.
99    pub created_at: String,
100}
101
102/// An invite token for a channel (`token` unique, DESIGN.md §6, §15).
103#[derive(Debug, Clone, PartialEq, Eq, SurrealValue)]
104pub struct InviteRecord {
105    /// The channel the token grants access to.
106    pub channel: String,
107    /// The opaque token string.
108    pub token: String,
109    /// Remaining redemptions, or unlimited if absent.
110    pub uses_remaining: Option<i64>,
111    /// RFC 3339 expiry, or non-expiring if absent.
112    pub expires_at: Option<String>,
113    /// The creating user.
114    pub created_by: String,
115}
116
117// Query variable bindings (SurrealDB 3.x binds a `SurrealValue` object of variables).
118
119#[derive(SurrealValue)]
120struct ByUsername {
121    username: String,
122}
123
124#[derive(SurrealValue)]
125struct ByPubkey {
126    pubkey: String,
127}
128
129#[derive(SurrealValue)]
130struct ByUser {
131    user: String,
132}
133
134#[derive(SurrealValue)]
135struct ByName {
136    name: String,
137}
138
139#[derive(SurrealValue)]
140struct ByToken {
141    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
142    tok: String,
143}
144
145#[derive(SurrealValue)]
146struct ByUserAndName {
147    user: String,
148    name: String,
149}
150
151#[derive(SurrealValue)]
152struct SetVisibility {
153    name: String,
154    visibility: String,
155}
156
157#[derive(SurrealValue)]
158struct Rename {
159    old: String,
160    new: String,
161}
162
163#[derive(SurrealValue)]
164struct ReadSinceBind {
165    channel: String,
166    since_ms: i64,
167    cap: i64,
168}
169
170#[derive(SurrealValue)]
171struct PurgeBind {
172    cutoff_ms: i64,
173}
174
175#[derive(SurrealValue)]
176struct SetUses {
177    // `token` is a protected variable name in SurrealQL, so bind under `tok`.
178    tok: String,
179    uses: i64,
180}
181
182#[derive(SurrealValue)]
183struct Membership {
184    channel: String,
185    user: String,
186}
187
188#[derive(SurrealValue)]
189struct ByChannel {
190    channel: String,
191}
192
193/// A bounded cap on optimistic-concurrency retries: high enough to clear realistic contention on a
194/// single channel record, low enough that a genuinely stuck write still surfaces (DESIGN.md §15).
195const MAX_WRITE_ATTEMPTS: usize = 64;
196
197/// Whether a `SurrealDB` error is an optimistic-concurrency write-write conflict (`SurrealKV`
198/// surfaces `TransactionWriteConflict`; `SurrealDB` maps it to `TransactionConflict`, both rendering
199/// with "conflict"). These are expected under concurrent load and must be retried per `SurrealDB`'s
200/// optimistic-concurrency contract — the loser of a same-key write re-applies its statement — rather
201/// than serialized behind an application lock (DESIGN.md §15).
202fn is_write_conflict(err: &surrealdb::Error) -> bool {
203    err.to_string().to_lowercase().contains("conflict")
204}
205
206/// The embedded store: a thin typed repository over an embedded `SurrealDB` instance. Cloning
207/// yields another handle to the same database (the inner client is a shared handle).
208#[derive(Clone)]
209pub struct Store {
210    db: Surreal<Db>,
211}
212
213impl Store {
214    /// Opens (or creates) a persistent store rooted at `path` using the `SurrealKV` backend.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the backend cannot be opened or the schema cannot be applied.
219    pub async fn open(path: &Path) -> Res<Self> {
220        let db = Surreal::new::<SurrealKv>(path.to_string_lossy().as_ref()).await.context("failed to open the embedded store")?;
221        Self::init(db).await
222    }
223
224    /// Opens an ephemeral in-memory store (for tests).
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the in-memory backend cannot be initialized.
229    pub async fn open_in_memory() -> Res<Self> {
230        let db = Surreal::new::<Mem>(()).await.context("failed to open the in-memory store")?;
231        Self::init(db).await
232    }
233
234    async fn init(db: Surreal<Db>) -> Res<Self> {
235        db.use_ns(NAMESPACE).use_db(DATABASE).await.context("failed to select namespace/database")?;
236        db.query(SCHEMA).await.context("failed to apply schema")?.check().context("schema application reported an error")?;
237        Ok(Self { db })
238    }
239
240    async fn insert<T: SurrealValue>(&self, table: &str, record: T) -> Void {
241        let _created: Option<Value> = self.db.create(table.to_owned()).content(record).await.with_context(|| format!("failed to insert into `{table}`"))?;
242        Ok(())
243    }
244
245    /// Returns the server's persistent instance ID, generating and storing one on first call.
246    ///
247    /// The ID is what lets a bridge recognize the same server reached under two different URLs
248    /// (PRD-0012 T-003); it is random, carries no meaning, and never changes for a data dir.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the read or the first-boot write fails.
253    pub async fn instance_id(&self) -> Res<String> {
254        let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to query server meta")?;
255        let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
256        if let Some(meta) = rows.into_iter().next() {
257            return Ok(meta.instance_id);
258        }
259
260        let id = crate::identity::generate_token()?;
261        // A fixed record id makes first-boot generation race-free: a concurrent second writer
262        // errors on the existing record and re-reads instead of minting a divergent ID.
263        let created: Result<Option<MetaRecord>, _> = self.db.create(("meta", "server")).content(MetaRecord { instance_id: id.clone() }).await;
264        if created.is_ok() {
265            return Ok(id);
266        }
267        let mut response = self.db.query("SELECT * OMIT id FROM meta").await.context("failed to re-query server meta")?;
268        let rows: Vec<MetaRecord> = response.take(0).context("failed to decode server meta")?;
269        rows.into_iter().next().map(|meta| meta.instance_id).context("server meta write raced but no record exists")
270    }
271
272    /// Appends one channel message to the retained history (PRD-0013 T-001).
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the write fails.
277    pub async fn append_message(&self, channel: &str, from: &str, payload: &[u8], ts_ms: i64) -> Void {
278        self.insert(
279            "message",
280            MessageRecord {
281                channel: channel.to_owned(),
282                from: from.to_owned(),
283                payload: payload.to_vec(),
284                ts_ms,
285            },
286        )
287        .await
288    }
289
290    /// Reads a channel's retained messages strictly after `since_ms`, oldest-first, capped at
291    /// `cap` rows (the caller re-asks with the last row's `ts_ms` to page).
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the query fails.
296    pub async fn read_messages_since(&self, channel: &str, since_ms: i64, cap: usize) -> Res<Vec<MessageRecord>> {
297        let mut response = self
298            .db
299            .query("SELECT * OMIT id FROM message WHERE channel = $channel AND ts_ms > $since_ms ORDER BY ts_ms ASC LIMIT $cap")
300            .bind(ReadSinceBind {
301                channel: channel.to_owned(),
302                since_ms,
303                cap: i64::try_from(cap).unwrap_or(i64::MAX),
304            })
305            .await
306            .context("failed to query message history")?;
307        let rows: Vec<MessageRecord> = response.take(0).context("failed to decode message history")?;
308        Ok(rows)
309    }
310
311    /// Deletes every retained message older than `cutoff_ms` (the retention sweep, PRD-0013).
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the delete fails.
316    pub async fn purge_messages_before(&self, cutoff_ms: i64) -> Void {
317        self.db
318            .query("DELETE FROM message WHERE ts_ms < $cutoff_ms")
319            .bind(PurgeBind { cutoff_ms })
320            .await
321            .context("failed to purge message history")?
322            .check()
323            .context("message purge reported an error")?;
324        Ok(())
325    }
326
327    /// Creates a user, enforcing the unique-username constraint.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the username is already taken or the write fails.
332    pub async fn create_user(&self, username: &str) -> Res<UserRecord> {
333        let record = UserRecord {
334            username: username.to_owned(),
335            created_at: now_rfc3339(),
336        };
337        self.insert("user", record.clone()).await?;
338        Ok(record)
339    }
340
341    /// Fetches a user by username.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the query fails.
346    pub async fn get_user(&self, username: &str) -> Res<Option<UserRecord>> {
347        let mut response = self
348            .db
349            .query("SELECT * OMIT id FROM user WHERE username = $username")
350            .bind(ByUsername { username: username.to_owned() })
351            .await
352            .context("failed to query user")?;
353        let rows: Vec<UserRecord> = response.take(0).context("failed to decode user rows")?;
354        Ok(rows.into_iter().next())
355    }
356
357    /// Enrolls a machine, enforcing the globally-unique pubkey and per-user-unique name constraints.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the pubkey is already enrolled, the name collides within the user, or the
362    /// write fails.
363    pub async fn create_machine(&self, user: &str, name: &str, pubkey_base64: &str) -> Res<MachineRecord> {
364        let record = MachineRecord {
365            user: user.to_owned(),
366            name: name.to_owned(),
367            pubkey: pubkey_base64.to_owned(),
368            added_at: now_rfc3339(),
369        };
370        self.insert("machine", record.clone()).await?;
371        Ok(record)
372    }
373
374    /// Fetches a machine by its base64 public key.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the query fails.
379    pub async fn get_machine_by_pubkey(&self, pubkey_base64: &str) -> Res<Option<MachineRecord>> {
380        let mut response = self
381            .db
382            .query("SELECT * OMIT id FROM machine WHERE pubkey = $pubkey")
383            .bind(ByPubkey { pubkey: pubkey_base64.to_owned() })
384            .await
385            .context("failed to query machine")?;
386        let rows: Vec<MachineRecord> = response.take(0).context("failed to decode machine rows")?;
387        Ok(rows.into_iter().next())
388    }
389
390    /// Lists the machines enrolled under a user.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the query fails.
395    pub async fn list_machines(&self, user: &str) -> Res<Vec<MachineRecord>> {
396        let mut response = self
397            .db
398            .query("SELECT * OMIT id FROM machine WHERE user = $user")
399            .bind(ByUser { user: user.to_owned() })
400            .await
401            .context("failed to list machines")?;
402        response.take(0).context("failed to decode machine rows")
403    }
404
405    /// Revokes a machine by `(user, name)`.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the delete fails.
410    pub async fn delete_machine(&self, user: &str, name: &str) -> Void {
411        self.db
412            .query("DELETE machine WHERE user = $user AND name = $name")
413            .bind(ByUserAndName {
414                user: user.to_owned(),
415                name: name.to_owned(),
416            })
417            .await
418            .context("failed to delete machine")?
419            .check()
420            .context("machine delete reported an error")?;
421        Ok(())
422    }
423
424    /// Creates a channel, enforcing the unique-name constraint.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the name is already taken or the write fails.
429    pub async fn create_channel(&self, name: &str, visibility: Visibility, created_by: &str) -> Res<ChannelRecord> {
430        let record = ChannelRecord {
431            name: name.to_owned(),
432            visibility: visibility.as_str().to_owned(),
433            created_by: created_by.to_owned(),
434            created_at: now_rfc3339(),
435        };
436        self.insert("channel", record.clone()).await?;
437        // The creator is the channel's first member (it also administers via `created_by`).
438        self.add_channel_member(name, created_by).await?;
439        Ok(record)
440    }
441
442    /// Fetches a channel by name.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the query fails.
447    pub async fn get_channel(&self, name: &str) -> Res<Option<ChannelRecord>> {
448        let mut response = self
449            .db
450            .query("SELECT * OMIT id FROM channel WHERE name = $name")
451            .bind(ByName { name: name.to_owned() })
452            .await
453            .context("failed to query channel")?;
454        let rows: Vec<ChannelRecord> = response.take(0).context("failed to decode channel rows")?;
455        Ok(rows.into_iter().next())
456    }
457
458    /// Creates an invite token, enforcing the unique-token constraint.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if the token already exists or the write fails.
463    pub async fn create_invite(&self, channel: &str, token: &str, uses_remaining: Option<i64>, expires_at: Option<String>, created_by: &str) -> Res<InviteRecord> {
464        let record = InviteRecord {
465            channel: channel.to_owned(),
466            token: token.to_owned(),
467            uses_remaining,
468            expires_at,
469            created_by: created_by.to_owned(),
470        };
471        self.insert("invite", record.clone()).await?;
472        Ok(record)
473    }
474
475    /// Fetches an invite by token.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the query fails.
480    pub async fn get_invite(&self, token: &str) -> Res<Option<InviteRecord>> {
481        let mut response = self
482            .db
483            .query("SELECT * OMIT id FROM invite WHERE token = $tok")
484            .bind(ByToken { tok: token.to_owned() })
485            .await
486            .context("failed to query invite")?;
487        let rows: Vec<InviteRecord> = response.take(0).context("failed to decode invite rows")?;
488        Ok(rows.into_iter().next())
489    }
490
491    /// The outstanding invites for one channel (the channel-admin audit view).
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if the query fails.
496    pub async fn list_invites(&self, channel: &str) -> Res<Vec<InviteRecord>> {
497        let mut response = self
498            .db
499            .query("SELECT * OMIT id FROM invite WHERE channel = $channel")
500            .bind(ByChannel { channel: channel.to_owned() })
501            .await
502            .context("failed to list invites")?;
503        response.take(0).context("failed to decode invite rows")
504    }
505
506    /// Lists every channel; the caller applies visibility / membership gating (DESIGN.md §6).
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the query fails.
511    pub async fn list_channels(&self) -> Res<Vec<ChannelRecord>> {
512        let mut response = self.db.query("SELECT * OMIT id FROM channel").await.context("failed to list channels")?;
513        response.take(0).context("failed to decode channel rows")
514    }
515
516    /// Adds `user` to a channel's membership (its ACL), idempotently. Each membership is its own
517    /// record under the unique `(channel, user)` index, so concurrent adds of different users write
518    /// distinct keys and never contend on a shared row (PRD-0007 T-003); a conflict on the same pair
519    /// is retried per `SurrealDB`'s optimistic-concurrency contract.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the write keeps conflicting past the retry cap or otherwise fails.
524    pub async fn add_channel_member(&self, channel: &str, user: &str) -> Void {
525        for attempt in 0..MAX_WRITE_ATTEMPTS {
526            let outcome = self
527                .db
528                .query("INSERT INTO membership { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
529                .bind(Membership {
530                    channel: channel.to_owned(),
531                    user: user.to_owned(),
532                })
533                .await
534                .and_then(surrealdb::IndexedResults::check);
535            match outcome {
536                Ok(_) => return Ok(()),
537                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
538                Err(e) => return Err(anyhow::Error::new(e).context("failed to add channel member")),
539            }
540        }
541        anyhow::bail!("adding a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
542    }
543
544    /// Removes `user` from a channel's membership; idempotent (removing a non-member is a no-op).
545    ///
546    /// # Errors
547    ///
548    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
549    pub async fn remove_channel_member(&self, channel: &str, user: &str) -> Void {
550        for attempt in 0..MAX_WRITE_ATTEMPTS {
551            let outcome = self
552                .db
553                .query("DELETE membership WHERE channel = $channel AND user = $user")
554                .bind(Membership {
555                    channel: channel.to_owned(),
556                    user: user.to_owned(),
557                })
558                .await
559                .and_then(surrealdb::IndexedResults::check);
560            match outcome {
561                Ok(_) => return Ok(()),
562                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
563                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove channel member")),
564            }
565        }
566        anyhow::bail!("removing a channel member exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
567    }
568
569    /// Whether `user` is a member of `channel`.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if the query fails.
574    pub async fn is_channel_member(&self, channel: &str, user: &str) -> Res<bool> {
575        let mut response = self
576            .db
577            .query("SELECT VALUE user FROM membership WHERE channel = $channel AND user = $user")
578            .bind(Membership {
579                channel: channel.to_owned(),
580                user: user.to_owned(),
581            })
582            .await
583            .context("failed to query membership")?;
584        let rows: Vec<String> = response.take(0).context("failed to decode membership rows")?;
585        Ok(!rows.is_empty())
586    }
587
588    /// The channels `user` is a member of (for discovery gating, DESIGN.md §6).
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if the query fails.
593    pub async fn list_user_memberships(&self, user: &str) -> Res<Vec<String>> {
594        let mut response = self
595            .db
596            .query("SELECT VALUE channel FROM membership WHERE user = $user")
597            .bind(ByUser { user: user.to_owned() })
598            .await
599            .context("failed to list user memberships")?;
600        response.take(0).context("failed to decode membership channels")
601    }
602
603    /// The members of a channel (its ACL users).
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if the query fails.
608    pub async fn list_channel_members(&self, channel: &str) -> Res<Vec<String>> {
609        let mut response = self
610            .db
611            .query("SELECT VALUE user FROM membership WHERE channel = $channel")
612            .bind(ByChannel { channel: channel.to_owned() })
613            .await
614            .context("failed to list channel members")?;
615        response.take(0).context("failed to decode membership users")
616    }
617
618    /// Records a channel ban for `user`; idempotent (banning twice is a no-op). Durable so bans
619    /// survive a server restart; the hub mirrors them in memory for its lock-guarded checks.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the insert keeps conflicting past the retry cap or otherwise fails.
624    pub async fn add_ban(&self, channel: &str, user: &str) -> Void {
625        for attempt in 0..MAX_WRITE_ATTEMPTS {
626            let outcome = self
627                .db
628                .query("INSERT INTO ban { channel: $channel, user: $user } ON DUPLICATE KEY UPDATE channel = $channel")
629                .bind(Membership {
630                    channel: channel.to_owned(),
631                    user: user.to_owned(),
632                })
633                .await
634                .and_then(surrealdb::IndexedResults::check);
635            match outcome {
636                Ok(_) => return Ok(()),
637                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
638                Err(e) => return Err(anyhow::Error::new(e).context("failed to add ban")),
639            }
640        }
641        anyhow::bail!("adding a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
642    }
643
644    /// Lifts a channel ban; idempotent (removing an absent ban is a no-op).
645    ///
646    /// # Errors
647    ///
648    /// Returns an error if the delete keeps conflicting past the retry cap or otherwise fails.
649    pub async fn remove_ban(&self, channel: &str, user: &str) -> Void {
650        for attempt in 0..MAX_WRITE_ATTEMPTS {
651            let outcome = self
652                .db
653                .query("DELETE ban WHERE channel = $channel AND user = $user")
654                .bind(Membership {
655                    channel: channel.to_owned(),
656                    user: user.to_owned(),
657                })
658                .await
659                .and_then(surrealdb::IndexedResults::check);
660            match outcome {
661                Ok(_) => return Ok(()),
662                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
663                Err(e) => return Err(anyhow::Error::new(e).context("failed to remove ban")),
664            }
665        }
666        anyhow::bail!("removing a ban exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
667    }
668
669    /// Every persisted `(channel, user)` ban, for loading the hub's in-memory view at startup.
670    ///
671    /// # Errors
672    ///
673    /// Returns an error if the query fails.
674    pub async fn list_bans(&self) -> Res<Vec<(String, String)>> {
675        let mut response = self.db.query("SELECT channel, user FROM ban").await.context("failed to list bans")?;
676        let rows: Vec<Membership> = response.take(0).context("failed to decode ban rows")?;
677        Ok(rows.into_iter().map(|row| (row.channel, row.user)).collect())
678    }
679
680    /// The users banned from one channel (the channel-admin audit view).
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the query fails.
685    pub async fn list_channel_bans(&self, channel: &str) -> Res<Vec<String>> {
686        let mut response = self
687            .db
688            .query("SELECT VALUE user FROM ban WHERE channel = $channel")
689            .bind(ByChannel { channel: channel.to_owned() })
690            .await
691            .context("failed to list channel bans")?;
692        response.take(0).context("failed to decode ban users")
693    }
694
695    /// The names of the channels created (and administered) by `user`.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the query fails.
700    pub async fn list_channels_created_by(&self, user: &str) -> Res<Vec<String>> {
701        let mut response = self
702            .db
703            .query("SELECT VALUE name FROM channel WHERE created_by = $user")
704            .bind(ByUser { user: user.to_owned() })
705            .await
706            .context("failed to list created channels")?;
707        response.take(0).context("failed to decode channel names")
708    }
709
710    /// Removes every membership held by `user` (used when a user is removed, DESIGN.md §7).
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if the delete fails.
715    pub async fn delete_user_memberships(&self, user: &str) -> Void {
716        self.db
717            .query("DELETE membership WHERE user = $user")
718            .bind(ByUser { user: user.to_owned() })
719            .await
720            .context("failed to delete user memberships")?
721            .check()
722            .context("user membership delete reported an error")?;
723        Ok(())
724    }
725
726    /// Changes a channel's visibility tier.
727    ///
728    /// # Errors
729    ///
730    /// Returns an error if the update fails.
731    pub async fn set_channel_visibility(&self, name: &str, visibility: Visibility) -> Void {
732        self.db
733            .query("UPDATE channel SET visibility = $visibility WHERE name = $name")
734            .bind(SetVisibility {
735                name: name.to_owned(),
736                visibility: visibility.as_str().to_owned(),
737            })
738            .await
739            .context("failed to update channel visibility")?
740            .check()
741            .context("channel visibility update reported an error")?;
742        Ok(())
743    }
744
745    /// Renames a channel, enforcing the unique-name constraint on the new name.
746    ///
747    /// # Errors
748    ///
749    /// Returns an error if the new name is already taken or the update fails.
750    pub async fn rename_channel(&self, old: &str, new: &str) -> Void {
751        self.db
752            .query("UPDATE channel SET name = $new WHERE name = $old")
753            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
754            .await
755            .context("failed to rename channel")?
756            .check()
757            .context("channel rename reported an error")?;
758        // Keep memberships and invites attached to the renamed channel (PRD-0007 T-004).
759        self.db
760            .query("UPDATE membership SET channel = $new WHERE channel = $old")
761            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
762            .await
763            .context("failed to migrate channel memberships")?
764            .check()
765            .context("membership rename reported an error")?;
766        self.db
767            .query("UPDATE invite SET channel = $new WHERE channel = $old")
768            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
769            .await
770            .context("failed to migrate channel invites")?
771            .check()
772            .context("invite rename reported an error")?;
773        self.db
774            .query("UPDATE ban SET channel = $new WHERE channel = $old")
775            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
776            .await
777            .context("failed to migrate channel bans")?
778            .check()
779            .context("ban rename reported an error")?;
780        self.db
781            .query("UPDATE message SET channel = $new WHERE channel = $old")
782            .bind(Rename { old: old.to_owned(), new: new.to_owned() })
783            .await
784            .context("failed to migrate channel history")?
785            .check()
786            .context("message rename reported an error")?;
787        Ok(())
788    }
789
790    /// Deletes a channel.
791    ///
792    /// # Errors
793    ///
794    /// Returns an error if the delete fails.
795    pub async fn delete_channel(&self, name: &str) -> Void {
796        self.db
797            .query("DELETE channel WHERE name = $name")
798            .bind(ByName { name: name.to_owned() })
799            .await
800            .context("failed to delete channel")?
801            .check()
802            .context("channel delete reported an error")?;
803        // Drop the channel's memberships and invites so a future same-named channel cannot inherit
804        // them (invite cascade — PRD-0007 T-004, finding #5).
805        self.db
806            .query("DELETE membership WHERE channel = $channel")
807            .bind(ByChannel { channel: name.to_owned() })
808            .await
809            .context("failed to delete channel memberships")?
810            .check()
811            .context("membership delete reported an error")?;
812        self.db
813            .query("DELETE invite WHERE channel = $channel")
814            .bind(ByChannel { channel: name.to_owned() })
815            .await
816            .context("failed to delete channel invites")?
817            .check()
818            .context("invite delete reported an error")?;
819        self.db
820            .query("DELETE ban WHERE channel = $channel")
821            .bind(ByChannel { channel: name.to_owned() })
822            .await
823            .context("failed to delete channel bans")?
824            .check()
825            .context("ban delete reported an error")?;
826        // History dies with the channel — a future same-named channel must not inherit it, and a
827        // deleted channel's contents must not be readable ever again (PRD-0013).
828        self.db
829            .query("DELETE message WHERE channel = $channel")
830            .bind(ByChannel { channel: name.to_owned() })
831            .await
832            .context("failed to delete channel history")?
833            .check()
834            .context("message delete reported an error")?;
835        Ok(())
836    }
837
838    /// Sets an invite's remaining redemptions (used when redeeming a limited-use token).
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if the update fails.
843    pub async fn set_invite_uses(&self, token: &str, uses_remaining: i64) -> Void {
844        self.db
845            .query("UPDATE invite SET uses_remaining = $uses WHERE token = $tok")
846            .bind(SetUses {
847                tok: token.to_owned(),
848                uses: uses_remaining,
849            })
850            .await
851            .context("failed to update invite uses")?
852            .check()
853            .context("invite uses update reported an error")?;
854        Ok(())
855    }
856
857    /// Atomically consumes one redemption of a limited-use invite: decrements `uses_remaining` only
858    /// while it is positive, returning whether a use was claimed. The guarded single-statement update
859    /// (retried on an optimistic-concurrency conflict) makes concurrent redeemers of the last use
860    /// mutually exclusive, so a single-use token admits exactly one (PRD-0007 T-003). The caller
861    /// handles unlimited (`None`) tokens and expiry; an exhausted token is deleted.
862    ///
863    /// # Errors
864    ///
865    /// Returns an error if the update keeps conflicting past the retry cap or otherwise fails.
866    pub async fn try_consume_invite_use(&self, token: &str) -> Res<bool> {
867        for attempt in 0..MAX_WRITE_ATTEMPTS {
868            let outcome = self
869                .db
870                .query("UPDATE invite SET uses_remaining = uses_remaining - 1 WHERE token = $tok AND uses_remaining > 0 RETURN VALUE uses_remaining")
871                .bind(ByToken { tok: token.to_owned() })
872                .await
873                .and_then(surrealdb::IndexedResults::check);
874            match outcome {
875                Ok(mut response) => {
876                    let remaining: Vec<i64> = response.take(0).context("failed to decode invite uses")?;
877                    return match remaining.into_iter().next() {
878                        // No positive-use row matched — the token was already spent (or removed).
879                        None => Ok(false),
880                        // This redemption took the last use; delete the spent token.
881                        Some(0) => {
882                            self.delete_invite(token).await?;
883                            Ok(true)
884                        }
885                        // A use was consumed with more remaining.
886                        Some(_) => Ok(true),
887                    };
888                }
889                Err(e) if is_write_conflict(&e) && attempt + 1 < MAX_WRITE_ATTEMPTS => tokio::task::yield_now().await,
890                Err(e) => return Err(anyhow::Error::new(e).context("failed to consume invite use")),
891            }
892        }
893        anyhow::bail!("consuming an invite use exhausted {MAX_WRITE_ATTEMPTS} write-conflict retries")
894    }
895
896    /// Deletes an invite token (on revoke or when an exhausted token is redeemed).
897    ///
898    /// # Errors
899    ///
900    /// Returns an error if the delete fails.
901    pub async fn delete_invite(&self, token: &str) -> Void {
902        self.db
903            .query("DELETE invite WHERE token = $tok")
904            .bind(ByToken { tok: token.to_owned() })
905            .await
906            .context("failed to delete invite")?
907            .check()
908            .context("invite delete reported an error")?;
909        Ok(())
910    }
911
912    /// Lists every registered user (server-admin `user list`).
913    ///
914    /// # Errors
915    ///
916    /// Returns an error if the query fails.
917    pub async fn list_users(&self) -> Res<Vec<UserRecord>> {
918        let mut response = self.db.query("SELECT * OMIT id FROM user").await.context("failed to list users")?;
919        response.take(0).context("failed to decode user rows")
920    }
921
922    /// Deletes a user (server-admin `user remove`); the caller also revokes the user's machines.
923    ///
924    /// # Errors
925    ///
926    /// Returns an error if the delete fails.
927    pub async fn delete_user(&self, username: &str) -> Void {
928        self.db
929            .query("DELETE user WHERE username = $username")
930            .bind(ByUsername { username: username.to_owned() })
931            .await
932            .context("failed to delete user")?
933            .check()
934            .context("user delete reported an error")?;
935        Ok(())
936    }
937}
938
939fn now_rfc3339() -> String {
940    chrono::Utc::now().to_rfc3339()
941}
942
943#[cfg(test)]
944mod tests {
945    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
946    #![allow(clippy::unwrap_used)]
947
948    use super::*;
949    use pretty_assertions::assert_eq;
950
951    async fn store() -> Store {
952        Store::open_in_memory().await.unwrap()
953    }
954
955    #[tokio::test]
956    async fn store_instance_id_is_stable_across_reopen() {
957        let dir = tempfile::TempDir::new().unwrap();
958
959        let first = {
960            let store = Store::open(dir.path()).await.unwrap();
961            let id = store.instance_id().await.unwrap();
962            assert!(!id.is_empty());
963            // Generated once, returned verbatim thereafter.
964            assert_eq!(store.instance_id().await.unwrap(), id);
965            id
966        };
967
968        // The ID must survive a server restart — it is what lets a bridge recognize the same
969        // server behind two URLs (PRD-0012 T-003). SurrealKV releases its file lock
970        // asynchronously after drop, so the reopen polls (bounded) until the lock frees.
971        let reopened = 'reopen: {
972            for _ in 0..50 {
973                if let Ok(store) = Store::open(dir.path()).await {
974                    break 'reopen store;
975                }
976                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
977            }
978            panic!("the store never became reopenable after drop");
979        };
980        assert_eq!(reopened.instance_id().await.unwrap(), first);
981    }
982
983    #[tokio::test]
984    async fn store_messages_append_and_read_since_in_order() {
985        let store = store().await;
986        store.append_message("ops", "aaron/ws/s1", b"one", 100).await.unwrap();
987        store.append_message("ops", "aaron/ws/s1", b"two", 200).await.unwrap();
988        store.append_message("other", "aaron/ws/s1", b"elsewhere", 150).await.unwrap();
989
990        // Channel-scoped, oldest-first, strictly-after the watermark.
991        let msgs = store.read_messages_since("ops", 50, 10).await.unwrap();
992        assert_eq!(msgs.iter().map(|m| m.payload.as_slice()).collect::<Vec<_>>(), vec![b"one".as_slice(), b"two".as_slice()]);
993        assert_eq!((msgs[0].ts_ms, msgs[0].from.as_str()), (100, "aaron/ws/s1"));
994
995        let after = store.read_messages_since("ops", 100, 10).await.unwrap();
996        assert_eq!(after.len(), 1, "since is exclusive: the watermark message itself is not re-delivered");
997        assert_eq!(after[0].payload, b"two");
998
999        // The page cap bounds a read, oldest-first (the client re-asks with the last ts).
1000        let capped = store.read_messages_since("ops", 0, 1).await.unwrap();
1001        assert_eq!(capped.len(), 1);
1002        assert_eq!(capped[0].payload, b"one");
1003    }
1004
1005    #[tokio::test]
1006    async fn store_messages_purge_before_removes_old_rows() {
1007        let store = store().await;
1008        store.append_message("ops", "aaron/ws/s1", b"old", 100).await.unwrap();
1009        store.append_message("ops", "aaron/ws/s1", b"new", 200).await.unwrap();
1010
1011        store.purge_messages_before(150).await.unwrap();
1012
1013        let msgs = store.read_messages_since("ops", 0, 10).await.unwrap();
1014        assert_eq!(msgs.len(), 1, "rows older than the retention cutoff must be gone");
1015        assert_eq!(msgs[0].payload, b"new");
1016    }
1017
1018    #[tokio::test]
1019    async fn store_messages_cascade_on_channel_delete_and_rename() {
1020        let store = store().await;
1021        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1022        store.append_message("ops", "aaron/ws/s1", b"hello", 100).await.unwrap();
1023
1024        // History follows a rename…
1025        store.rename_channel("ops", "ops2").await.unwrap();
1026        assert_eq!(store.read_messages_since("ops2", 0, 10).await.unwrap().len(), 1);
1027        assert!(store.read_messages_since("ops", 0, 10).await.unwrap().is_empty());
1028
1029        // …and dies with the channel.
1030        store.delete_channel("ops2").await.unwrap();
1031        assert!(store.read_messages_since("ops2", 0, 10).await.unwrap().is_empty());
1032    }
1033
1034    #[tokio::test]
1035    async fn user_create_and_fetch_round_trip() {
1036        let store = store().await;
1037        let created = store.create_user("aaron").await.unwrap();
1038
1039        assert_eq!(store.get_user("aaron").await.unwrap(), Some(created));
1040        assert_eq!(store.get_user("nobody").await.unwrap(), None);
1041    }
1042
1043    #[tokio::test]
1044    async fn duplicate_username_is_rejected() {
1045        let store = store().await;
1046        store.create_user("aaron").await.unwrap();
1047
1048        assert!(store.create_user("aaron").await.is_err(), "the unique-username constraint must reject a duplicate");
1049    }
1050
1051    #[tokio::test]
1052    async fn machine_pubkey_is_globally_unique() {
1053        let store = store().await;
1054        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
1055
1056        // Same pubkey under a different user/name must still be rejected.
1057        assert!(store.create_machine("david", "desktop", "PUBKEY-A").await.is_err());
1058    }
1059
1060    #[tokio::test]
1061    async fn machine_name_is_unique_within_a_user_but_not_across_users() {
1062        let store = store().await;
1063        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
1064
1065        // Same name, same user, different key -> rejected.
1066        assert!(store.create_machine("aaron", "workstation", "PUBKEY-B").await.is_err());
1067        // Same name under a different user -> allowed.
1068        store.create_machine("david", "workstation", "PUBKEY-C").await.unwrap();
1069    }
1070
1071    #[tokio::test]
1072    async fn machines_list_and_delete_for_a_user() {
1073        let store = store().await;
1074        store.create_machine("aaron", "workstation", "PUBKEY-A").await.unwrap();
1075        store.create_machine("aaron", "sno-box", "PUBKEY-B").await.unwrap();
1076
1077        assert_eq!(store.list_machines("aaron").await.unwrap().len(), 2);
1078
1079        store.delete_machine("aaron", "sno-box").await.unwrap();
1080        let remaining = store.list_machines("aaron").await.unwrap();
1081        assert_eq!(remaining.len(), 1);
1082        assert_eq!(remaining[0].name, "workstation");
1083    }
1084
1085    #[tokio::test]
1086    async fn channel_create_fetch_and_unique_name() {
1087        let store = store().await;
1088        let created = store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1089
1090        assert_eq!(created.visibility, "private");
1091        assert_eq!(store.get_channel("ops").await.unwrap(), Some(created));
1092        assert!(store.create_channel("ops", Visibility::Public, "david").await.is_err());
1093    }
1094
1095    #[tokio::test]
1096    async fn invite_create_fetch_and_unique_token() {
1097        let store = store().await;
1098        let created = store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
1099
1100        assert_eq!(store.get_invite("tok-123").await.unwrap(), Some(created));
1101        assert!(store.create_invite("ops", "tok-123", None, None, "aaron").await.is_err());
1102    }
1103
1104    #[tokio::test]
1105    async fn channel_membership_add_remove_and_list() {
1106        let store = store().await;
1107        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1108        // The creator is seeded as the first member.
1109        assert!(store.is_channel_member("ops", "aaron").await.unwrap());
1110
1111        store.add_channel_member("ops", "david").await.unwrap();
1112        // Idempotent: re-adding an existing member is a no-op, not a duplicate or an error.
1113        store.add_channel_member("ops", "david").await.unwrap();
1114        assert!(store.is_channel_member("ops", "david").await.unwrap());
1115
1116        let mut members = store.list_channel_members("ops").await.unwrap();
1117        members.sort();
1118        assert_eq!(members, vec!["aaron".to_owned(), "david".to_owned()]);
1119        assert_eq!(store.list_user_memberships("david").await.unwrap(), vec!["ops".to_owned()]);
1120
1121        store.remove_channel_member("ops", "david").await.unwrap();
1122        assert!(!store.is_channel_member("ops", "david").await.unwrap());
1123    }
1124
1125    #[tokio::test]
1126    async fn channel_memberships_follow_delete_and_rename() {
1127        let store = store().await;
1128        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1129        store.add_channel_member("ops", "david").await.unwrap();
1130
1131        // Rename migrates memberships to the new channel name.
1132        store.rename_channel("ops", "operations").await.unwrap();
1133        assert!(store.is_channel_member("operations", "david").await.unwrap());
1134        assert!(!store.is_channel_member("ops", "david").await.unwrap());
1135
1136        // Deleting a channel drops its memberships, so a future same-named channel cannot inherit them.
1137        store.delete_channel("operations").await.unwrap();
1138        assert!(store.list_channel_members("operations").await.unwrap().is_empty());
1139    }
1140
1141    #[tokio::test]
1142    async fn channel_visibility_can_be_changed() {
1143        let store = store().await;
1144        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1145
1146        store.set_channel_visibility("ops", Visibility::Public).await.unwrap();
1147
1148        assert_eq!(store.get_channel("ops").await.unwrap().unwrap().visibility, "public");
1149    }
1150
1151    #[tokio::test]
1152    async fn channel_rename_moves_the_record_and_respects_uniqueness() {
1153        let store = store().await;
1154        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1155        store.create_channel("taken", Visibility::Public, "aaron").await.unwrap();
1156
1157        store.rename_channel("ops", "operations").await.unwrap();
1158        assert!(store.get_channel("ops").await.unwrap().is_none());
1159        assert!(store.get_channel("operations").await.unwrap().is_some());
1160
1161        // Renaming onto an existing name is rejected by the unique index.
1162        assert!(store.rename_channel("operations", "taken").await.is_err());
1163    }
1164
1165    #[tokio::test]
1166    async fn channel_can_be_deleted_and_listed() {
1167        let store = store().await;
1168        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1169        store.create_channel("lobby", Visibility::Public, "aaron").await.unwrap();
1170
1171        assert_eq!(store.list_channels().await.unwrap().len(), 2);
1172
1173        store.delete_channel("ops").await.unwrap();
1174        let remaining = store.list_channels().await.unwrap();
1175        assert_eq!(remaining.len(), 1);
1176        assert_eq!(remaining[0].name, "lobby");
1177    }
1178
1179    #[tokio::test]
1180    async fn invite_uses_can_be_decremented_and_revoked() {
1181        let store = store().await;
1182        store.create_invite("ops", "tok-123", Some(5), None, "aaron").await.unwrap();
1183
1184        store.set_invite_uses("tok-123", 4).await.unwrap();
1185        assert_eq!(store.get_invite("tok-123").await.unwrap().unwrap().uses_remaining, Some(4));
1186
1187        store.delete_invite("tok-123").await.unwrap();
1188        assert!(store.get_invite("tok-123").await.unwrap().is_none());
1189    }
1190
1191    #[tokio::test]
1192    async fn invites_are_dropped_when_the_channel_is_deleted() {
1193        let store = store().await;
1194        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1195        store.create_invite("ops", "tok", Some(5), None, "aaron").await.unwrap();
1196
1197        store.delete_channel("ops").await.unwrap();
1198        assert!(
1199            store.get_invite("tok").await.unwrap().is_none(),
1200            "deleting a channel must drop its invites so a future same-named channel cannot honor them"
1201        );
1202    }
1203
1204    #[tokio::test]
1205    async fn invites_follow_a_channel_rename() {
1206        let store = store().await;
1207        store.create_channel("ops", Visibility::Private, "aaron").await.unwrap();
1208        store.create_invite("ops", "tok", None, None, "aaron").await.unwrap();
1209
1210        store.rename_channel("ops", "operations").await.unwrap();
1211        assert_eq!(store.get_invite("tok").await.unwrap().unwrap().channel, "operations", "an invite must follow its renamed channel");
1212    }
1213
1214    #[tokio::test]
1215    async fn ban_add_remove_and_list_round_trip() {
1216        let store = store().await;
1217        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1218
1219        store.add_ban("ops", "bob").await.unwrap();
1220        store.add_ban("ops", "bob").await.unwrap(); // idempotent
1221        store.add_ban("ops", "mallory").await.unwrap();
1222
1223        let mut bans = store.list_bans().await.unwrap();
1224        bans.sort();
1225        assert_eq!(bans, vec![("ops".to_owned(), "bob".to_owned()), ("ops".to_owned(), "mallory".to_owned())]);
1226
1227        store.remove_ban("ops", "bob").await.unwrap();
1228        assert_eq!(store.list_bans().await.unwrap(), vec![("ops".to_owned(), "mallory".to_owned())]);
1229    }
1230
1231    #[tokio::test]
1232    async fn bans_follow_delete_and_rename() {
1233        let store = store().await;
1234        store.create_channel("ops", Visibility::Public, "aaron").await.unwrap();
1235        store.add_ban("ops", "bob").await.unwrap();
1236
1237        // A rename migrates the ban with the channel.
1238        store.rename_channel("ops", "operations").await.unwrap();
1239        assert_eq!(store.list_bans().await.unwrap(), vec![("operations".to_owned(), "bob".to_owned())]);
1240
1241        // A delete drops the channel's bans so a future same-named channel starts clean.
1242        store.delete_channel("operations").await.unwrap();
1243        assert!(store.list_bans().await.unwrap().is_empty(), "deleting a channel must drop its bans");
1244    }
1245
1246    #[tokio::test]
1247    async fn users_can_be_listed_and_deleted() {
1248        let store = store().await;
1249        store.create_user("aaron").await.unwrap();
1250        store.create_user("david").await.unwrap();
1251
1252        assert_eq!(store.list_users().await.unwrap().len(), 2);
1253
1254        store.delete_user("david").await.unwrap();
1255        let remaining = store.list_users().await.unwrap();
1256        assert_eq!(remaining.len(), 1);
1257        assert_eq!(remaining[0].username, "aaron");
1258    }
1259}