1use std::num::NonZeroU8;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use axum::response::ErrorResponse;
6use http::StatusCode;
7
8use spacetimedb::client::ClientActorIndex;
9use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
10use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
11use spacetimedb::identity::{AuthCtx, Identity};
12use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
13use spacetimedb::sql;
14use spacetimedb_client_api_messages::http::{SqlStmtResult, SqlStmtStats};
15use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld};
16use spacetimedb_lib::{ProductTypeElement, ProductValue};
17use spacetimedb_paths::server::ModuleLogsDir;
18use tokio::sync::watch;
19
20pub mod auth;
21pub mod routes;
22pub mod util;
23
24#[async_trait]
30pub trait NodeDelegate: Send + Sync {
31 fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
32 fn client_actor_index(&self) -> &ClientActorIndex;
33
34 type JwtAuthProviderT: auth::JwtAuthProvider;
35 fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT;
36 async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>>;
41 fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir;
42}
43
44pub struct Host {
46 pub replica_id: u64,
47 host_controller: HostController,
48}
49
50impl Host {
51 pub fn new(replica_id: u64, host_controller: HostController) -> Self {
52 Self {
53 replica_id,
54 host_controller,
55 }
56 }
57
58 pub async fn module(&self) -> Result<ModuleHost, NoSuchModule> {
59 self.host_controller.get_module_host(self.replica_id).await
60 }
61
62 pub async fn module_watcher(&self) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
63 self.host_controller.watch_module_host(self.replica_id).await
64 }
65
66 pub async fn exec_sql(
67 &self,
68 auth: AuthCtx,
69 database: Database,
70 body: String,
71 ) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
72 let module_host = self
73 .module()
74 .await
75 .map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
76
77 let json = self
78 .host_controller
79 .using_database(
80 database,
81 self.replica_id,
82 move |db| -> axum::response::Result<_, (StatusCode, String)> {
83 tracing::info!(sql = body);
84
85 let mut header = vec![];
87
88 let sql_start = std::time::Instant::now();
89 let sql_span =
90 tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();
91
92 let result = sql::execute::run(
93 db,
95 &body,
96 auth,
97 Some(&module_host.info().subscriptions),
98 &mut header,
99 )
100 .map_err(|e| {
101 log::warn!("{e}");
102 if let Some(auth_err) = e.get_auth_error() {
103 (StatusCode::UNAUTHORIZED, auth_err.to_string())
104 } else {
105 (StatusCode::BAD_REQUEST, e.to_string())
106 }
107 })?;
108
109 let total_duration = sql_start.elapsed();
110 sql_span.record("total_duration", tracing::field::debug(total_duration));
111
112 let schema = header
114 .into_iter()
115 .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
116 .collect();
117
118 Ok(vec![SqlStmtResult {
119 schema,
120 rows: result.rows,
121 total_duration_micros: total_duration.as_micros() as u64,
122 stats: SqlStmtStats::from_metrics(&result.metrics),
123 }])
124 },
125 )
126 .await
127 .map_err(log_and_500)??;
128
129 Ok(json)
130 }
131
132 pub async fn update(
133 &self,
134 database: Database,
135 host_type: HostType,
136 program_bytes: Box<[u8]>,
137 ) -> anyhow::Result<UpdateDatabaseResult> {
138 self.host_controller
139 .update_module_host(database, host_type, self.replica_id, program_bytes)
140 .await
141 }
142}
143
144pub struct DatabaseDef {
148 pub database_identity: Identity,
150 pub program_bytes: Vec<u8>,
152 pub num_replicas: Option<NonZeroU8>,
156 pub host_type: HostType,
158}
159
160#[async_trait]
178pub trait ControlStateDelegate: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync {}
179
180impl<T: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync> ControlStateDelegate for T {}
181
182pub trait ControlStateReadAccess {
184 fn get_node_id(&self) -> Option<u64>;
186 fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>>;
187 fn get_nodes(&self) -> anyhow::Result<Vec<Node>>;
188
189 fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>>;
191 fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>>;
192 fn get_databases(&self) -> anyhow::Result<Vec<Database>>;
193
194 fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>>;
196 fn get_replicas(&self) -> anyhow::Result<Vec<Replica>>;
197 fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica>;
198
199 fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>>;
201
202 fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>>;
204 fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>>;
205}
206
207#[async_trait]
209pub trait ControlStateWriteAccess: Send + Sync {
210 async fn publish_database(
219 &self,
220 publisher: &Identity,
221 spec: DatabaseDef,
222 ) -> anyhow::Result<Option<UpdateDatabaseResult>>;
223
224 async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()>;
225
226 async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()>;
228 async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()>;
229
230 async fn register_tld(&self, identity: &Identity, tld: Tld) -> anyhow::Result<RegisterTldResult>;
232 async fn create_dns_record(
233 &self,
234 owner_identity: &Identity,
235 domain: &DomainName,
236 database_identity: &Identity,
237 ) -> anyhow::Result<InsertDomainResult>;
238
239 async fn replace_dns_records(
249 &self,
250 database_identity: &Identity,
251 owner_identity: &Identity,
252 domain_names: &[DomainName],
253 ) -> anyhow::Result<SetDomainsResult>;
254}
255
256impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
257 fn get_node_id(&self) -> Option<u64> {
259 (**self).get_node_id()
260 }
261 fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
262 (**self).get_node_by_id(node_id)
263 }
264 fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
265 (**self).get_nodes()
266 }
267
268 fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
270 (**self).get_database_by_id(id)
271 }
272 fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result<Option<Database>> {
273 (**self).get_database_by_identity(identity)
274 }
275 fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
276 (**self).get_databases()
277 }
278
279 fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
281 (**self).get_replica_by_id(id)
282 }
283 fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
284 (**self).get_replicas()
285 }
286
287 fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
289 (**self).get_energy_balance(identity)
290 }
291
292 fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
294 (**self).lookup_identity(domain)
295 }
296
297 fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
298 (**self).reverse_lookup(database_identity)
299 }
300
301 fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
302 (**self).get_leader_replica_by_database(database_id)
303 }
304}
305
306#[async_trait]
307impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
308 async fn publish_database(
309 &self,
310 identity: &Identity,
311 spec: DatabaseDef,
312 ) -> anyhow::Result<Option<UpdateDatabaseResult>> {
313 (**self).publish_database(identity, spec).await
314 }
315
316 async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {
317 (**self).delete_database(caller_identity, database_identity).await
318 }
319
320 async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()> {
321 (**self).add_energy(identity, amount).await
322 }
323 async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()> {
324 (**self).withdraw_energy(identity, amount).await
325 }
326
327 async fn register_tld(&self, identity: &Identity, tld: Tld) -> anyhow::Result<RegisterTldResult> {
328 (**self).register_tld(identity, tld).await
329 }
330
331 async fn create_dns_record(
332 &self,
333 identity: &Identity,
334 domain: &DomainName,
335 database_identity: &Identity,
336 ) -> anyhow::Result<InsertDomainResult> {
337 (**self).create_dns_record(identity, domain, database_identity).await
338 }
339
340 async fn replace_dns_records(
341 &self,
342 database_identity: &Identity,
343 owner_identity: &Identity,
344 domain_names: &[DomainName],
345 ) -> anyhow::Result<SetDomainsResult> {
346 (**self)
347 .replace_dns_records(database_identity, owner_identity, domain_names)
348 .await
349 }
350}
351
352#[async_trait]
353impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
354 type JwtAuthProviderT = T::JwtAuthProviderT;
355 fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
356 (**self).gather_metrics()
357 }
358
359 fn client_actor_index(&self) -> &ClientActorIndex {
360 (**self).client_actor_index()
361 }
362
363 fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT {
364 (**self).jwt_auth_provider()
365 }
366
367 async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
368 (**self).leader(database_id).await
369 }
370
371 fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
372 (**self).module_logs_dir(replica_id)
373 }
374}
375
376pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {
377 log::error!("internal error: {e:#}");
378 (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")).into()
379}