spacetimedb_client_api/
lib.rs

1use std::num::NonZeroU8;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use axum::response::ErrorResponse;
6use http::StatusCode;
7
8use spacetimedb::client::ClientActorIndex;
9use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
10use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
11use spacetimedb::identity::{AuthCtx, Identity};
12use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
13use spacetimedb::sql;
14use spacetimedb_client_api_messages::http::{SqlStmtResult, SqlStmtStats};
15use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld};
16use spacetimedb_lib::{ProductTypeElement, ProductValue};
17use spacetimedb_paths::server::ModuleLogsDir;
18use tokio::sync::watch;
19
20pub mod auth;
21pub mod routes;
22pub mod util;
23
24/// Defines the state / environment of a SpacetimeDB node from the PoV of the
25/// client API.
26///
27/// Types returned here should be considered internal state and **never** be
28/// surfaced to the API.
29#[async_trait]
30pub trait NodeDelegate: Send + Sync {
31    fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
32    fn client_actor_index(&self) -> &ClientActorIndex;
33
34    type JwtAuthProviderT: auth::JwtAuthProvider;
35    fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT;
36    /// Return the leader [`Host`] of `database_id`.
37    ///
38    /// Returns `None` if the current leader is not hosted by this node.
39    /// The [`Host`] is spawned implicitly if not already running.
40    async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>>;
41    fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir;
42}
43
44/// Client view of a running module.
45pub struct Host {
46    pub replica_id: u64,
47    host_controller: HostController,
48}
49
50impl Host {
51    pub fn new(replica_id: u64, host_controller: HostController) -> Self {
52        Self {
53            replica_id,
54            host_controller,
55        }
56    }
57
58    pub async fn module(&self) -> Result<ModuleHost, NoSuchModule> {
59        self.host_controller.get_module_host(self.replica_id).await
60    }
61
62    pub async fn module_watcher(&self) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
63        self.host_controller.watch_module_host(self.replica_id).await
64    }
65
66    pub async fn exec_sql(
67        &self,
68        auth: AuthCtx,
69        database: Database,
70        body: String,
71    ) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
72        let module_host = self
73            .module()
74            .await
75            .map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
76
77        let json = self
78            .host_controller
79            .using_database(
80                database,
81                self.replica_id,
82                move |db| -> axum::response::Result<_, (StatusCode, String)> {
83                    tracing::info!(sql = body);
84
85                    // We need a header for query results
86                    let mut header = vec![];
87
88                    let sql_start = std::time::Instant::now();
89                    let sql_span =
90                        tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();
91
92                    let result = sql::execute::run(
93                        // Returns an empty result set for mutations
94                        db,
95                        &body,
96                        auth,
97                        Some(&module_host.info().subscriptions),
98                        &mut header,
99                    )
100                    .map_err(|e| {
101                        log::warn!("{e}");
102                        if let Some(auth_err) = e.get_auth_error() {
103                            (StatusCode::UNAUTHORIZED, auth_err.to_string())
104                        } else {
105                            (StatusCode::BAD_REQUEST, e.to_string())
106                        }
107                    })?;
108
109                    let total_duration = sql_start.elapsed();
110                    sql_span.record("total_duration", tracing::field::debug(total_duration));
111
112                    // Turn the header into a `ProductType`
113                    let schema = header
114                        .into_iter()
115                        .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
116                        .collect();
117
118                    Ok(vec![SqlStmtResult {
119                        schema,
120                        rows: result.rows,
121                        total_duration_micros: total_duration.as_micros() as u64,
122                        stats: SqlStmtStats::from_metrics(&result.metrics),
123                    }])
124                },
125            )
126            .await
127            .map_err(log_and_500)??;
128
129        Ok(json)
130    }
131
132    pub async fn update(
133        &self,
134        database: Database,
135        host_type: HostType,
136        program_bytes: Box<[u8]>,
137    ) -> anyhow::Result<UpdateDatabaseResult> {
138        self.host_controller
139            .update_module_host(database, host_type, self.replica_id, program_bytes)
140            .await
141    }
142}
143
144/// Parameters for publishing a database.
145///
146/// See [`ControlStateDelegate::publish_database`].
147pub struct DatabaseDef {
148    /// The [`Identity`] the database shall have.
149    pub database_identity: Identity,
150    /// The compiled program of the database module.
151    pub program_bytes: Vec<u8>,
152    /// The desired number of replicas the database shall have.
153    ///
154    /// If `None`, the edition default is used.
155    pub num_replicas: Option<NonZeroU8>,
156    /// The host type of the supplied program.
157    pub host_type: HostType,
158}
159
160/// API of the SpacetimeDB control plane.
161///
162/// The trait is the composition of [`ControlStateReadAccess`] and
163/// [`ControlStateWriteAccess`] to reflect the consistency model of SpacetimeDB
164/// as of this writing:
165///
166/// The control plane state represents the _desired_ state of an ensemble of
167/// SpacetimeDB nodes. As such, this state can be read from a local (in-memory)
168/// representation, which is guaranteed to be "prefix consistent" across all
169/// nodes of a cluster. Prefix consistency means that the state being examined
170/// is consistent, but reads may not return the most recently written values.
171///
172/// As a consequence, implementations are not currently required to guarantee
173/// read-after-write consistency. In the future, however, write operations may
174/// be required to return the observed state after completing. As this may
175/// require them to suspend themselves while waiting for the writes to propagate,
176/// [`ControlStateWriteAccess`] methods are marked `async` today already.
177#[async_trait]
178pub trait ControlStateDelegate: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync {}
179
180impl<T: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync> ControlStateDelegate for T {}
181
182/// Query API of the SpacetimeDB control plane.
183pub trait ControlStateReadAccess {
184    // Nodes
185    fn get_node_id(&self) -> Option<u64>;
186    fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>>;
187    fn get_nodes(&self) -> anyhow::Result<Vec<Node>>;
188
189    // Databases
190    fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>>;
191    fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>>;
192    fn get_databases(&self) -> anyhow::Result<Vec<Database>>;
193
194    // Replicas
195    fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>>;
196    fn get_replicas(&self) -> anyhow::Result<Vec<Replica>>;
197    fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica>;
198
199    // Energy
200    fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>>;
201
202    // DNS
203    fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>>;
204    fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>>;
205}
206
207/// Write operations on the SpacetimeDB control plane.
208#[async_trait]
209pub trait ControlStateWriteAccess: Send + Sync {
210    /// Publish a database acc. to [`DatabaseDef`].
211    ///
212    /// If the database with the given identity was successfully published before,
213    /// it is updated acc. to the module lifecycle conventions. `Some` result is
214    /// returned in that case.
215    ///
216    /// Otherwise, `None` is returned meaning that the database was freshly
217    /// initialized.
218    async fn publish_database(
219        &self,
220        publisher: &Identity,
221        spec: DatabaseDef,
222    ) -> anyhow::Result<Option<UpdateDatabaseResult>>;
223
224    async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()>;
225
226    // Energy
227    async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()>;
228    async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()>;
229
230    // DNS
231    async fn register_tld(&self, identity: &Identity, tld: Tld) -> anyhow::Result<RegisterTldResult>;
232    async fn create_dns_record(
233        &self,
234        owner_identity: &Identity,
235        domain: &DomainName,
236        database_identity: &Identity,
237    ) -> anyhow::Result<InsertDomainResult>;
238
239    /// Replace all dns records pointing to `database_identity` with `domain_names`.
240    ///
241    /// All existing names in the database and in `domain_names` must be
242    /// owned by `owner_identity` (i.e. their TLD must belong to `owner_identity`).
243    ///
244    /// The `owner_identity` is typically also the owner of the database.
245    ///
246    /// Note that passing an empty slice is legal, and will just remove any
247    /// existing dns records.
248    async fn replace_dns_records(
249        &self,
250        database_identity: &Identity,
251        owner_identity: &Identity,
252        domain_names: &[DomainName],
253    ) -> anyhow::Result<SetDomainsResult>;
254}
255
256impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
257    // Nodes
258    fn get_node_id(&self) -> Option<u64> {
259        (**self).get_node_id()
260    }
261    fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
262        (**self).get_node_by_id(node_id)
263    }
264    fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
265        (**self).get_nodes()
266    }
267
268    // Databases
269    fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
270        (**self).get_database_by_id(id)
271    }
272    fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result<Option<Database>> {
273        (**self).get_database_by_identity(identity)
274    }
275    fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
276        (**self).get_databases()
277    }
278
279    // Replicas
280    fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
281        (**self).get_replica_by_id(id)
282    }
283    fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
284        (**self).get_replicas()
285    }
286
287    // Energy
288    fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
289        (**self).get_energy_balance(identity)
290    }
291
292    // DNS
293    fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
294        (**self).lookup_identity(domain)
295    }
296
297    fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
298        (**self).reverse_lookup(database_identity)
299    }
300
301    fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
302        (**self).get_leader_replica_by_database(database_id)
303    }
304}
305
306#[async_trait]
307impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
308    async fn publish_database(
309        &self,
310        identity: &Identity,
311        spec: DatabaseDef,
312    ) -> anyhow::Result<Option<UpdateDatabaseResult>> {
313        (**self).publish_database(identity, spec).await
314    }
315
316    async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {
317        (**self).delete_database(caller_identity, database_identity).await
318    }
319
320    async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()> {
321        (**self).add_energy(identity, amount).await
322    }
323    async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()> {
324        (**self).withdraw_energy(identity, amount).await
325    }
326
327    async fn register_tld(&self, identity: &Identity, tld: Tld) -> anyhow::Result<RegisterTldResult> {
328        (**self).register_tld(identity, tld).await
329    }
330
331    async fn create_dns_record(
332        &self,
333        identity: &Identity,
334        domain: &DomainName,
335        database_identity: &Identity,
336    ) -> anyhow::Result<InsertDomainResult> {
337        (**self).create_dns_record(identity, domain, database_identity).await
338    }
339
340    async fn replace_dns_records(
341        &self,
342        database_identity: &Identity,
343        owner_identity: &Identity,
344        domain_names: &[DomainName],
345    ) -> anyhow::Result<SetDomainsResult> {
346        (**self)
347            .replace_dns_records(database_identity, owner_identity, domain_names)
348            .await
349    }
350}
351
352#[async_trait]
353impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
354    type JwtAuthProviderT = T::JwtAuthProviderT;
355    fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
356        (**self).gather_metrics()
357    }
358
359    fn client_actor_index(&self) -> &ClientActorIndex {
360        (**self).client_actor_index()
361    }
362
363    fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT {
364        (**self).jwt_auth_provider()
365    }
366
367    async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
368        (**self).leader(database_id).await
369    }
370
371    fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
372        (**self).module_logs_dir(replica_id)
373    }
374}
375
376pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {
377    log::error!("internal error: {e:#}");
378    (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")).into()
379}