spacetimedb_client_api/routes/
database.rs

1use std::num::NonZeroU8;
2use std::str::FromStr;
3use std::time::Duration;
4
5use crate::auth::{
6    anon_auth_middleware, SpacetimeAuth, SpacetimeEnergyUsed, SpacetimeExecutionDurationMicros, SpacetimeIdentity,
7    SpacetimeIdentityToken,
8};
9use crate::routes::subscribe::generate_random_connection_id;
10use crate::util::{ByteStringBody, NameOrIdentity};
11use crate::{log_and_500, ControlStateDelegate, DatabaseDef, NodeDelegate};
12use axum::body::{Body, Bytes};
13use axum::extract::{Path, Query, State};
14use axum::response::{ErrorResponse, IntoResponse};
15use axum::routing::MethodRouter;
16use axum::Extension;
17use axum_extra::TypedHeader;
18use futures::StreamExt;
19use http::StatusCode;
20use serde::Deserialize;
21use spacetimedb::database_logger::DatabaseLogger;
22use spacetimedb::host::module_host::ClientConnectedError;
23use spacetimedb::host::ReducerArgs;
24use spacetimedb::host::ReducerCallError;
25use spacetimedb::host::ReducerOutcome;
26use spacetimedb::host::UpdateDatabaseResult;
27use spacetimedb::identity::Identity;
28use spacetimedb::messages::control_db::{Database, HostType};
29use spacetimedb_client_api_messages::name::{self, DatabaseName, DomainName, PublishOp, PublishResult};
30use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
31use spacetimedb_lib::identity::AuthCtx;
32use spacetimedb_lib::{sats, Timestamp};
33
34use super::subscribe::{handle_websocket, HasWebSocketOptions};
35
36#[derive(Deserialize)]
37pub struct CallParams {
38    name_or_identity: NameOrIdentity,
39    reducer: String,
40}
41
42pub const NO_SUCH_DATABASE: (StatusCode, &str) = (StatusCode::NOT_FOUND, "No such database.");
43
44pub async fn call<S: ControlStateDelegate + NodeDelegate>(
45    State(worker_ctx): State<S>,
46    Extension(auth): Extension<SpacetimeAuth>,
47    Path(CallParams {
48        name_or_identity,
49        reducer,
50    }): Path<CallParams>,
51    TypedHeader(content_type): TypedHeader<headers::ContentType>,
52    ByteStringBody(body): ByteStringBody,
53) -> axum::response::Result<impl IntoResponse> {
54    if content_type != headers::ContentType::json() {
55        return Err(axum::extract::rejection::MissingJsonContentType::default().into());
56    }
57    let caller_identity = auth.identity;
58
59    let args = ReducerArgs::Json(body);
60
61    let db_identity = name_or_identity.resolve(&worker_ctx).await?;
62    let database = worker_ctx_find_database(&worker_ctx, &db_identity)
63        .await?
64        .ok_or_else(|| {
65            log::error!("Could not find database: {}", db_identity.to_hex());
66            NO_SUCH_DATABASE
67        })?;
68    let identity = database.owner_identity;
69
70    let leader = worker_ctx
71        .leader(database.id)
72        .await
73        .map_err(log_and_500)?
74        .ok_or(StatusCode::NOT_FOUND)?;
75    let module = leader.module().await.map_err(log_and_500)?;
76
77    // HTTP callers always need a connection ID to provide to connect/disconnect,
78    // so generate one.
79    let connection_id = generate_random_connection_id();
80
81    match module.call_identity_connected(caller_identity, connection_id).await {
82        // If `call_identity_connected` returns `Err(Rejected)`, then the `client_connected` reducer errored,
83        // meaning the connection was refused. Return 403 forbidden.
84        Err(ClientConnectedError::Rejected(msg)) => return Err((StatusCode::FORBIDDEN, msg).into()),
85        // If `call_identity_connected` returns `Err(OutOfEnergy)`,
86        // then, well, the database is out of energy.
87        // Return 503 service unavailable.
88        Err(err @ ClientConnectedError::OutOfEnergy) => {
89            return Err((StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into())
90        }
91        // If `call_identity_connected` returns `Err(ReducerCall)`,
92        // something went wrong while invoking the `client_connected` reducer.
93        // I (pgoldman 2025-03-27) am not really sure how this would happen,
94        // but we returned 404 not found in this case prior to my editing this code,
95        // so I guess let's keep doing that.
96        Err(ClientConnectedError::ReducerCall(e)) => {
97            return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into())
98        }
99        // If `call_identity_connected` returns `Err(DBError)`,
100        // then the module didn't define `client_connected`,
101        // but something went wrong when we tried to insert into `st_client`.
102        // That's weird and scary, so return 500 internal error.
103        Err(e @ ClientConnectedError::DBError(_)) => {
104            return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into())
105        }
106
107        // If `call_identity_connected` returns `Ok`, then we can actually call the reducer we want.
108        Ok(()) => (),
109    }
110    let result = match module
111        .call_reducer(caller_identity, Some(connection_id), None, None, None, &reducer, args)
112        .await
113    {
114        Ok(rcr) => Ok(rcr),
115        Err(e) => {
116            let status_code = match e {
117                ReducerCallError::Args(_) => {
118                    log::debug!("Attempt to call reducer with invalid arguments");
119                    StatusCode::BAD_REQUEST
120                }
121                ReducerCallError::NoSuchModule(_) | ReducerCallError::ScheduleReducerNotFound => StatusCode::NOT_FOUND,
122                ReducerCallError::NoSuchReducer => {
123                    log::debug!("Attempt to call non-existent reducer {reducer}");
124                    StatusCode::NOT_FOUND
125                }
126                ReducerCallError::LifecycleReducer(lifecycle) => {
127                    log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
128                    StatusCode::BAD_REQUEST
129                }
130            };
131
132            log::debug!("Error while invoking reducer {e:#}");
133            Err((status_code, format!("{:#}", anyhow::anyhow!(e))))
134        }
135    };
136
137    if let Err(e) = module.call_identity_disconnected(caller_identity, connection_id).await {
138        // If `call_identity_disconnected` errors, something is very wrong:
139        // it means we tried to delete the `st_client` row but failed.
140        // Note that `call_identity_disconnected` swallows errors from the `client_disconnected` reducer.
141        // Slap a 500 on it and pray.
142        return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(e))).into());
143    }
144
145    match result {
146        Ok(result) => {
147            let (status, body) = reducer_outcome_response(&identity, &reducer, result.outcome);
148            Ok((
149                status,
150                TypedHeader(SpacetimeEnergyUsed(result.energy_used)),
151                TypedHeader(SpacetimeExecutionDurationMicros(result.execution_duration)),
152                body,
153            ))
154        }
155        Err(e) => Err((e.0, e.1).into()),
156    }
157}
158
159fn reducer_outcome_response(identity: &Identity, reducer: &str, outcome: ReducerOutcome) -> (StatusCode, String) {
160    match outcome {
161        ReducerOutcome::Committed => (StatusCode::OK, "".to_owned()),
162        ReducerOutcome::Failed(errmsg) => {
163            // TODO: different status code? this is what cloudflare uses, sorta
164            (StatusCode::from_u16(530).unwrap(), errmsg)
165        }
166        ReducerOutcome::BudgetExceeded => {
167            log::warn!("Node's energy budget exceeded for identity: {identity} while executing {reducer}");
168            (
169                StatusCode::PAYMENT_REQUIRED,
170                "Module energy budget exhausted.".to_owned(),
171            )
172        }
173    }
174}
175
176#[derive(Debug, derive_more::From)]
177pub enum DBCallErr {
178    HandlerError(ErrorResponse),
179    NoSuchDatabase,
180    InstanceNotScheduled,
181}
182
183#[derive(Deserialize)]
184pub struct SchemaParams {
185    name_or_identity: NameOrIdentity,
186}
187#[derive(Deserialize)]
188pub struct SchemaQueryParams {
189    version: SchemaVersion,
190}
191
192#[derive(Deserialize)]
193enum SchemaVersion {
194    #[serde(rename = "9")]
195    V9,
196}
197
198pub async fn schema<S>(
199    State(worker_ctx): State<S>,
200    Path(SchemaParams { name_or_identity }): Path<SchemaParams>,
201    Query(SchemaQueryParams { version }): Query<SchemaQueryParams>,
202    Extension(auth): Extension<SpacetimeAuth>,
203) -> axum::response::Result<impl IntoResponse>
204where
205    S: ControlStateDelegate + NodeDelegate,
206{
207    let db_identity = name_or_identity.resolve(&worker_ctx).await?;
208    let database = worker_ctx_find_database(&worker_ctx, &db_identity)
209        .await?
210        .ok_or(NO_SUCH_DATABASE)?;
211
212    let leader = worker_ctx
213        .leader(database.id)
214        .await
215        .map_err(log_and_500)?
216        .ok_or(StatusCode::NOT_FOUND)?;
217    let module = leader.module().await.map_err(log_and_500)?;
218
219    let module_def = &module.info.module_def;
220    let response_json = match version {
221        SchemaVersion::V9 => {
222            let raw = RawModuleDefV9::from(module_def.clone());
223            axum::Json(sats::serde::SerdeWrapper(raw)).into_response()
224        }
225    };
226
227    Ok((
228        TypedHeader(SpacetimeIdentity(auth.identity)),
229        TypedHeader(SpacetimeIdentityToken(auth.creds)),
230        response_json,
231    ))
232}
233
234#[derive(Deserialize)]
235pub struct DatabaseParam {
236    name_or_identity: NameOrIdentity,
237}
238
239#[derive(sats::Serialize)]
240struct DatabaseResponse {
241    database_identity: Identity,
242    owner_identity: Identity,
243    host_type: HostType,
244    initial_program: spacetimedb_lib::Hash,
245}
246
247impl From<Database> for DatabaseResponse {
248    fn from(db: Database) -> Self {
249        DatabaseResponse {
250            database_identity: db.database_identity,
251            owner_identity: db.owner_identity,
252            host_type: db.host_type,
253            initial_program: db.initial_program,
254        }
255    }
256}
257
258pub async fn db_info<S: ControlStateDelegate>(
259    State(worker_ctx): State<S>,
260    Path(DatabaseParam { name_or_identity }): Path<DatabaseParam>,
261) -> axum::response::Result<impl IntoResponse> {
262    log::trace!("Trying to resolve database identity: {name_or_identity:?}");
263    let database_identity = name_or_identity.resolve(&worker_ctx).await?;
264    log::trace!("Resolved identity to: {database_identity:?}");
265    let database = worker_ctx_find_database(&worker_ctx, &database_identity)
266        .await?
267        .ok_or(NO_SUCH_DATABASE)?;
268    log::trace!("Fetched database from the worker db for database identity: {database_identity:?}");
269
270    let response = DatabaseResponse::from(database);
271    Ok(axum::Json(sats::serde::SerdeWrapper(response)))
272}
273
274#[derive(Deserialize)]
275pub struct LogsParams {
276    name_or_identity: NameOrIdentity,
277}
278
279#[derive(Deserialize)]
280pub struct LogsQuery {
281    num_lines: Option<u32>,
282    #[serde(default)]
283    follow: bool,
284}
285
286pub async fn logs<S>(
287    State(worker_ctx): State<S>,
288    Path(LogsParams { name_or_identity }): Path<LogsParams>,
289    Query(LogsQuery { num_lines, follow }): Query<LogsQuery>,
290    Extension(auth): Extension<SpacetimeAuth>,
291) -> axum::response::Result<impl IntoResponse>
292where
293    S: ControlStateDelegate + NodeDelegate,
294{
295    // You should not be able to read the logs from a database that you do not own
296    // so, unless you are the owner, this will fail.
297
298    let database_identity: Identity = name_or_identity.resolve(&worker_ctx).await?;
299    let database = worker_ctx_find_database(&worker_ctx, &database_identity)
300        .await?
301        .ok_or(NO_SUCH_DATABASE)?;
302
303    if database.owner_identity != auth.identity {
304        return Err((
305            StatusCode::BAD_REQUEST,
306            format!(
307                "Identity does not own database, expected: {} got: {}",
308                database.owner_identity.to_hex(),
309                auth.identity.to_hex()
310            ),
311        )
312            .into());
313    }
314
315    let replica = worker_ctx
316        .get_leader_replica_by_database(database.id)
317        .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?;
318    let replica_id = replica.id;
319
320    let logs_dir = worker_ctx.module_logs_dir(replica_id);
321    let lines = DatabaseLogger::read_latest(logs_dir, num_lines).await;
322
323    let body = if follow {
324        let leader = worker_ctx
325            .leader(database.id)
326            .await
327            .map_err(log_and_500)?
328            .ok_or(StatusCode::NOT_FOUND)?;
329        let log_rx = leader
330            .module()
331            .await
332            .map_err(log_and_500)?
333            .subscribe_to_logs()
334            .map_err(log_and_500)?;
335
336        let stream = tokio_stream::wrappers::BroadcastStream::new(log_rx).filter_map(move |x| {
337            std::future::ready(match x {
338                Ok(log) => Some(log),
339                Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(skipped)) => {
340                    log::trace!(
341                        "Skipped {} lines in log for module {}",
342                        skipped,
343                        database_identity.to_hex()
344                    );
345                    None
346                }
347            })
348        });
349
350        let stream = futures::stream::once(std::future::ready(lines.into()))
351            .chain(stream)
352            .map(Ok::<_, std::convert::Infallible>);
353
354        Body::from_stream(stream)
355    } else {
356        Body::from(lines)
357    };
358
359    Ok((
360        TypedHeader(headers::CacheControl::new().with_no_cache()),
361        TypedHeader(headers::ContentType::from(mime_ndjson())),
362        body,
363    ))
364}
365
366fn mime_ndjson() -> mime::Mime {
367    "application/x-ndjson".parse().unwrap()
368}
369
370pub(crate) async fn worker_ctx_find_database(
371    worker_ctx: &(impl ControlStateDelegate + ?Sized),
372    database_identity: &Identity,
373) -> axum::response::Result<Option<Database>> {
374    worker_ctx
375        .get_database_by_identity(database_identity)
376        .map_err(log_and_500)
377}
378
379#[derive(Deserialize)]
380pub struct SqlParams {
381    name_or_identity: NameOrIdentity,
382}
383
384#[derive(Deserialize)]
385pub struct SqlQueryParams {}
386
387pub async fn sql<S>(
388    State(worker_ctx): State<S>,
389    Path(SqlParams { name_or_identity }): Path<SqlParams>,
390    Query(SqlQueryParams {}): Query<SqlQueryParams>,
391    Extension(auth): Extension<SpacetimeAuth>,
392    body: String,
393) -> axum::response::Result<impl IntoResponse>
394where
395    S: NodeDelegate + ControlStateDelegate,
396{
397    // Anyone is authorized to execute SQL queries. The SQL engine will determine
398    // which queries this identity is allowed to execute against the database.
399
400    let db_identity = name_or_identity.resolve(&worker_ctx).await?;
401    let database = worker_ctx_find_database(&worker_ctx, &db_identity)
402        .await?
403        .ok_or(NO_SUCH_DATABASE)?;
404
405    let auth = AuthCtx::new(database.owner_identity, auth.identity);
406    log::debug!("auth: {auth:?}");
407
408    let host = worker_ctx
409        .leader(database.id)
410        .await
411        .map_err(log_and_500)?
412        .ok_or(StatusCode::NOT_FOUND)?;
413    let json = host.exec_sql(auth, database, body).await?;
414
415    let total_duration = json.iter().fold(0, |acc, x| acc + x.total_duration_micros);
416
417    Ok((
418        TypedHeader(SpacetimeExecutionDurationMicros(Duration::from_micros(total_duration))),
419        axum::Json(json),
420    ))
421}
422
423#[derive(Deserialize)]
424pub struct DNSParams {
425    name_or_identity: NameOrIdentity,
426}
427
428#[derive(Deserialize)]
429pub struct ReverseDNSParams {
430    name_or_identity: NameOrIdentity,
431}
432
433#[derive(Deserialize)]
434pub struct DNSQueryParams {}
435
436pub async fn get_identity<S: ControlStateDelegate>(
437    State(ctx): State<S>,
438    Path(DNSParams { name_or_identity }): Path<DNSParams>,
439    Query(DNSQueryParams {}): Query<DNSQueryParams>,
440) -> axum::response::Result<impl IntoResponse> {
441    let identity = name_or_identity.resolve(&ctx).await?;
442    Ok(identity.to_string())
443}
444
445pub async fn get_names<S: ControlStateDelegate>(
446    State(ctx): State<S>,
447    Path(ReverseDNSParams { name_or_identity }): Path<ReverseDNSParams>,
448) -> axum::response::Result<impl IntoResponse> {
449    let database_identity = name_or_identity.resolve(&ctx).await?;
450
451    let names = ctx
452        .reverse_lookup(&database_identity)
453        .map_err(log_and_500)?
454        .into_iter()
455        .filter_map(|x| String::from(x).try_into().ok())
456        .collect();
457
458    let response = name::GetNamesResponse { names };
459    Ok(axum::Json(response))
460}
461
462#[derive(Deserialize)]
463pub struct PublishDatabaseParams {
464    name_or_identity: Option<NameOrIdentity>,
465}
466
467#[derive(Deserialize)]
468pub struct PublishDatabaseQueryParams {
469    #[serde(default)]
470    clear: bool,
471    num_replicas: Option<usize>,
472}
473
474use std::env;
475fn require_spacetime_auth_for_creation() -> bool {
476    env::var("TEMP_REQUIRE_SPACETIME_AUTH").is_ok_and(|v| !v.is_empty())
477}
478
479// A hacky function to let us restrict database creation on maincloud.
480fn allow_creation(auth: &SpacetimeAuth) -> Result<(), ErrorResponse> {
481    if !require_spacetime_auth_for_creation() {
482        return Ok(());
483    }
484    if auth.issuer.trim_end_matches('/') == "https://auth.spacetimedb.com" {
485        Ok(())
486    } else {
487        log::trace!("Rejecting creation request because auth issuer is {}", auth.issuer);
488        Err((
489            StatusCode::UNAUTHORIZED,
490            "To create a database, you must be logged in with a SpacetimeDB account.",
491        )
492            .into())
493    }
494}
495
496pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
497    State(ctx): State<S>,
498    Path(PublishDatabaseParams { name_or_identity }): Path<PublishDatabaseParams>,
499    Query(PublishDatabaseQueryParams { clear, num_replicas }): Query<PublishDatabaseQueryParams>,
500    Extension(auth): Extension<SpacetimeAuth>,
501    body: Bytes,
502) -> axum::response::Result<axum::Json<PublishResult>> {
503    // You should not be able to publish to a database that you do not own
504    // so, unless you are the owner, this will fail.
505
506    let (database_identity, db_name) = match &name_or_identity {
507        Some(noa) => match noa.try_resolve(&ctx).await? {
508            Ok(resolved) => (resolved, noa.name()),
509            Err(name) => {
510                // `name_or_identity` was a `NameOrIdentity::Name`, but no record
511                // exists yet. Create it now with a fresh identity.
512                allow_creation(&auth)?;
513                let database_auth = SpacetimeAuth::alloc(&ctx).await?;
514                let database_identity = database_auth.identity;
515                let tld: name::Tld = name.clone().into();
516                let tld = match ctx.register_tld(&auth.identity, tld).await.map_err(log_and_500)? {
517                    name::RegisterTldResult::Success { domain }
518                    | name::RegisterTldResult::AlreadyRegistered { domain } => domain,
519                    name::RegisterTldResult::Unauthorized { .. } => {
520                        return Err((
521                            StatusCode::UNAUTHORIZED,
522                            axum::Json(PublishResult::PermissionDenied { name: name.clone() }),
523                        )
524                            .into())
525                    }
526                };
527                let res = ctx
528                    .create_dns_record(&auth.identity, &tld.into(), &database_identity)
529                    .await
530                    .map_err(log_and_500)?;
531                match res {
532                    name::InsertDomainResult::Success { .. } => {}
533                    name::InsertDomainResult::TldNotRegistered { .. }
534                    | name::InsertDomainResult::PermissionDenied { .. } => {
535                        return Err(log_and_500("impossible: we just registered the tld"))
536                    }
537                    name::InsertDomainResult::OtherError(e) => return Err(log_and_500(e)),
538                }
539                (database_identity, Some(name))
540            }
541        },
542        None => {
543            let database_auth = SpacetimeAuth::alloc(&ctx).await?;
544            let database_identity = database_auth.identity;
545            (database_identity, None)
546        }
547    };
548
549    log::trace!("Publishing to the identity: {}", database_identity.to_hex());
550
551    let op = {
552        let exists = ctx
553            .get_database_by_identity(&database_identity)
554            .map_err(log_and_500)?
555            .is_some();
556        if !exists {
557            allow_creation(&auth)?;
558        }
559
560        if clear && exists {
561            ctx.delete_database(&auth.identity, &database_identity)
562                .await
563                .map_err(log_and_500)?;
564        }
565
566        if exists {
567            PublishOp::Updated
568        } else {
569            PublishOp::Created
570        }
571    };
572
573    let num_replicas = num_replicas
574        .map(|n| {
575            let n = u8::try_from(n).map_err(|_| (StatusCode::BAD_REQUEST, "Replication factor {n} out of bounds"))?;
576            Ok::<_, ErrorResponse>(NonZeroU8::new(n))
577        })
578        .transpose()?
579        .flatten();
580
581    let maybe_updated = ctx
582        .publish_database(
583            &auth.identity,
584            DatabaseDef {
585                database_identity,
586                program_bytes: body.into(),
587                num_replicas,
588                host_type: HostType::Wasm,
589            },
590        )
591        .await
592        .map_err(log_and_500)?;
593
594    if let Some(updated) = maybe_updated {
595        match updated {
596            UpdateDatabaseResult::AutoMigrateError(errs) => {
597                return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {errs}")).into());
598            }
599            UpdateDatabaseResult::ErrorExecutingMigration(err) => {
600                return Err((
601                    StatusCode::BAD_REQUEST,
602                    format!("Failed to create or update the database: {err}"),
603                )
604                    .into());
605            }
606            UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {}
607        }
608    }
609
610    Ok(axum::Json(PublishResult::Success {
611        domain: db_name.cloned(),
612        database_identity,
613        op,
614    }))
615}
616
617#[derive(Deserialize)]
618pub struct DeleteDatabaseParams {
619    name_or_identity: NameOrIdentity,
620}
621
622pub async fn delete_database<S: ControlStateDelegate>(
623    State(ctx): State<S>,
624    Path(DeleteDatabaseParams { name_or_identity }): Path<DeleteDatabaseParams>,
625    Extension(auth): Extension<SpacetimeAuth>,
626) -> axum::response::Result<impl IntoResponse> {
627    let database_identity = name_or_identity.resolve(&ctx).await?;
628
629    ctx.delete_database(&auth.identity, &database_identity)
630        .await
631        .map_err(log_and_500)?;
632
633    Ok(())
634}
635
636#[derive(Deserialize)]
637pub struct AddNameParams {
638    name_or_identity: NameOrIdentity,
639}
640
641pub async fn add_name<S: ControlStateDelegate>(
642    State(ctx): State<S>,
643    Path(AddNameParams { name_or_identity }): Path<AddNameParams>,
644    Extension(auth): Extension<SpacetimeAuth>,
645    name: String,
646) -> axum::response::Result<impl IntoResponse> {
647    let name = DatabaseName::try_from(name).map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
648    let database_identity = name_or_identity.resolve(&ctx).await?;
649
650    let response = ctx
651        .create_dns_record(&auth.identity, &name.into(), &database_identity)
652        .await
653        // TODO: better error code handling
654        .map_err(log_and_500)?;
655
656    let code = match response {
657        name::InsertDomainResult::Success { .. } => StatusCode::OK,
658        name::InsertDomainResult::TldNotRegistered { .. } => StatusCode::BAD_REQUEST,
659        name::InsertDomainResult::PermissionDenied { .. } => StatusCode::UNAUTHORIZED,
660        name::InsertDomainResult::OtherError(_) => StatusCode::INTERNAL_SERVER_ERROR,
661    };
662
663    Ok((code, axum::Json(response)))
664}
665
666#[derive(Deserialize)]
667pub struct SetNamesParams {
668    name_or_identity: NameOrIdentity,
669}
670
671pub async fn set_names<S: ControlStateDelegate>(
672    State(ctx): State<S>,
673    Path(SetNamesParams { name_or_identity }): Path<SetNamesParams>,
674    Extension(auth): Extension<SpacetimeAuth>,
675    names: axum::Json<Vec<String>>,
676) -> axum::response::Result<impl IntoResponse> {
677    let validated_names = names
678        .0
679        .into_iter()
680        .map(|s| DatabaseName::from_str(&s).map(DomainName::from).map_err(|e| (s, e)))
681        .collect::<Result<Vec<_>, _>>()
682        .map_err(|(input, e)| (StatusCode::BAD_REQUEST, format!("Error parsing `{input}`: {e}")))?;
683
684    let database_identity = name_or_identity.resolve(&ctx).await?;
685
686    let database = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?;
687    let Some(database) = database else {
688        return Ok((
689            StatusCode::NOT_FOUND,
690            axum::Json(name::SetDomainsResult::DatabaseNotFound),
691        ));
692    };
693
694    if database.owner_identity != auth.identity {
695        return Ok((
696            StatusCode::UNAUTHORIZED,
697            axum::Json(name::SetDomainsResult::NotYourDatabase {
698                database: database.database_identity,
699            }),
700        ));
701    }
702
703    for name in &validated_names {
704        if ctx.lookup_identity(name.as_str()).unwrap().is_some() {
705            return Ok((
706                StatusCode::BAD_REQUEST,
707                axum::Json(name::SetDomainsResult::OtherError(format!(
708                    "Cannot rename to {} because it already is in use.",
709                    name.as_str()
710                ))),
711            ));
712        }
713    }
714
715    let response = ctx
716        .replace_dns_records(&database_identity, &database.owner_identity, &validated_names)
717        .await
718        .map_err(log_and_500)?;
719    let status = match response {
720        name::SetDomainsResult::Success => StatusCode::OK,
721        name::SetDomainsResult::PermissionDenied { .. }
722        | name::SetDomainsResult::PermissionDeniedOnAny { .. }
723        | name::SetDomainsResult::NotYourDatabase { .. } => StatusCode::UNAUTHORIZED,
724        name::SetDomainsResult::DatabaseNotFound => StatusCode::NOT_FOUND,
725        name::SetDomainsResult::OtherError(_) => StatusCode::INTERNAL_SERVER_ERROR,
726    };
727
728    Ok((status, axum::Json(response)))
729}
730
731#[derive(serde::Deserialize)]
732pub struct TimestampParams {
733    name_or_identity: NameOrIdentity,
734}
735
736/// Returns the database's view of the current time,
737/// as a SATS-JSON encoded [`Timestamp`].
738///
739/// Takes a particular database's [`NameOrIdentity`] as an argument
740/// because in a clusterized SpacetimeDB-cloud deployment,
741/// this request will be routed to the node running the requested database.
742async fn get_timestamp<S: ControlStateDelegate>(
743    State(worker_ctx): State<S>,
744    Path(TimestampParams { name_or_identity }): Path<TimestampParams>,
745) -> axum::response::Result<impl IntoResponse> {
746    let db_identity = name_or_identity.resolve(&worker_ctx).await?;
747
748    let _database = worker_ctx_find_database(&worker_ctx, &db_identity)
749        .await?
750        .ok_or_else(|| {
751            log::error!("Could not find database: {}", db_identity.to_hex());
752            NO_SUCH_DATABASE
753        })?;
754
755    Ok(axum::Json(sats::serde::SerdeWrapper(Timestamp::now())).into_response())
756}
757
758/// This struct allows the edition to customize `/database` routes more meticulously.
759pub struct DatabaseRoutes<S> {
760    /// POST /database
761    pub root_post: MethodRouter<S>,
762    /// PUT: /database/:name_or_identity
763    pub db_put: MethodRouter<S>,
764    /// GET: /database/:name_or_identity
765    pub db_get: MethodRouter<S>,
766    /// DELETE: /database/:name_or_identity
767    pub db_delete: MethodRouter<S>,
768    /// GET: /database/:name_or_identity/names
769    pub names_get: MethodRouter<S>,
770    /// POST: /database/:name_or_identity/names
771    pub names_post: MethodRouter<S>,
772    /// PUT: /database/:name_or_identity/names
773    pub names_put: MethodRouter<S>,
774    /// GET: /database/:name_or_identity/identity
775    pub identity_get: MethodRouter<S>,
776    /// GET: /database/:name_or_identity/subscribe
777    pub subscribe_get: MethodRouter<S>,
778    /// POST: /database/:name_or_identity/call/:reducer
779    pub call_reducer_post: MethodRouter<S>,
780    /// GET: /database/:name_or_identity/schema
781    pub schema_get: MethodRouter<S>,
782    /// GET: /database/:name_or_identity/logs
783    pub logs_get: MethodRouter<S>,
784    /// POST: /database/:name_or_identity/sql
785    pub sql_post: MethodRouter<S>,
786
787    /// GET: /database/: name_or_identity/unstable/timestamp
788    pub timestamp_get: MethodRouter<S>,
789}
790
791impl<S> Default for DatabaseRoutes<S>
792where
793    S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions + Clone + 'static,
794{
795    fn default() -> Self {
796        use axum::routing::{delete, get, post, put};
797        Self {
798            root_post: post(publish::<S>),
799            db_put: put(publish::<S>),
800            db_get: get(db_info::<S>),
801            db_delete: delete(delete_database::<S>),
802            names_get: get(get_names::<S>),
803            names_post: post(add_name::<S>),
804            names_put: put(set_names::<S>),
805            identity_get: get(get_identity::<S>),
806            subscribe_get: get(handle_websocket::<S>),
807            call_reducer_post: post(call::<S>),
808            schema_get: get(schema::<S>),
809            logs_get: get(logs::<S>),
810            sql_post: post(sql::<S>),
811            timestamp_get: get(get_timestamp::<S>),
812        }
813    }
814}
815
816impl<S> DatabaseRoutes<S>
817where
818    S: NodeDelegate + ControlStateDelegate + Clone + 'static,
819{
820    pub fn into_router(self, ctx: S) -> axum::Router<S> {
821        let db_router = axum::Router::<S>::new()
822            .route("/", self.db_put)
823            .route("/", self.db_get)
824            .route("/", self.db_delete)
825            .route("/names", self.names_get)
826            .route("/names", self.names_post)
827            .route("/names", self.names_put)
828            .route("/identity", self.identity_get)
829            .route("/subscribe", self.subscribe_get)
830            .route("/call/:reducer", self.call_reducer_post)
831            .route("/schema", self.schema_get)
832            .route("/logs", self.logs_get)
833            .route("/sql", self.sql_post)
834            .route("/unstable/timestamp", self.timestamp_get);
835
836        axum::Router::new()
837            .route("/", self.root_post)
838            .nest("/:name_or_identity", db_router)
839            .route_layer(axum::middleware::from_fn_with_state(ctx, anon_auth_middleware::<S>))
840    }
841}