1use std::num::NonZeroU8;
2use std::str::FromStr;
3use std::time::Duration;
4
5use crate::auth::{
6 anon_auth_middleware, SpacetimeAuth, SpacetimeEnergyUsed, SpacetimeExecutionDurationMicros, SpacetimeIdentity,
7 SpacetimeIdentityToken,
8};
9use crate::routes::subscribe::generate_random_connection_id;
10use crate::util::{ByteStringBody, NameOrIdentity};
11use crate::{log_and_500, ControlStateDelegate, DatabaseDef, NodeDelegate};
12use axum::body::{Body, Bytes};
13use axum::extract::{Path, Query, State};
14use axum::response::{ErrorResponse, IntoResponse};
15use axum::routing::MethodRouter;
16use axum::Extension;
17use axum_extra::TypedHeader;
18use futures::StreamExt;
19use http::StatusCode;
20use serde::Deserialize;
21use spacetimedb::database_logger::DatabaseLogger;
22use spacetimedb::host::module_host::ClientConnectedError;
23use spacetimedb::host::ReducerArgs;
24use spacetimedb::host::ReducerCallError;
25use spacetimedb::host::ReducerOutcome;
26use spacetimedb::host::UpdateDatabaseResult;
27use spacetimedb::identity::Identity;
28use spacetimedb::messages::control_db::{Database, HostType};
29use spacetimedb_client_api_messages::name::{self, DatabaseName, DomainName, PublishOp, PublishResult};
30use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
31use spacetimedb_lib::identity::AuthCtx;
32use spacetimedb_lib::{sats, Timestamp};
33
34use super::subscribe::{handle_websocket, HasWebSocketOptions};
35
36#[derive(Deserialize)]
37pub struct CallParams {
38 name_or_identity: NameOrIdentity,
39 reducer: String,
40}
41
42pub const NO_SUCH_DATABASE: (StatusCode, &str) = (StatusCode::NOT_FOUND, "No such database.");
43
44pub async fn call<S: ControlStateDelegate + NodeDelegate>(
45 State(worker_ctx): State<S>,
46 Extension(auth): Extension<SpacetimeAuth>,
47 Path(CallParams {
48 name_or_identity,
49 reducer,
50 }): Path<CallParams>,
51 TypedHeader(content_type): TypedHeader<headers::ContentType>,
52 ByteStringBody(body): ByteStringBody,
53) -> axum::response::Result<impl IntoResponse> {
54 if content_type != headers::ContentType::json() {
55 return Err(axum::extract::rejection::MissingJsonContentType::default().into());
56 }
57 let caller_identity = auth.identity;
58
59 let args = ReducerArgs::Json(body);
60
61 let db_identity = name_or_identity.resolve(&worker_ctx).await?;
62 let database = worker_ctx_find_database(&worker_ctx, &db_identity)
63 .await?
64 .ok_or_else(|| {
65 log::error!("Could not find database: {}", db_identity.to_hex());
66 NO_SUCH_DATABASE
67 })?;
68 let identity = database.owner_identity;
69
70 let leader = worker_ctx
71 .leader(database.id)
72 .await
73 .map_err(log_and_500)?
74 .ok_or(StatusCode::NOT_FOUND)?;
75 let module = leader.module().await.map_err(log_and_500)?;
76
77 let connection_id = generate_random_connection_id();
80
81 match module.call_identity_connected(caller_identity, connection_id).await {
82 Err(ClientConnectedError::Rejected(msg)) => return Err((StatusCode::FORBIDDEN, msg).into()),
85 Err(err @ ClientConnectedError::OutOfEnergy) => {
89 return Err((StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into())
90 }
91 Err(ClientConnectedError::ReducerCall(e)) => {
97 return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into())
98 }
99 Err(e @ ClientConnectedError::DBError(_)) => {
104 return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into())
105 }
106
107 Ok(()) => (),
109 }
110 let result = match module
111 .call_reducer(caller_identity, Some(connection_id), None, None, None, &reducer, args)
112 .await
113 {
114 Ok(rcr) => Ok(rcr),
115 Err(e) => {
116 let status_code = match e {
117 ReducerCallError::Args(_) => {
118 log::debug!("Attempt to call reducer with invalid arguments");
119 StatusCode::BAD_REQUEST
120 }
121 ReducerCallError::NoSuchModule(_) | ReducerCallError::ScheduleReducerNotFound => StatusCode::NOT_FOUND,
122 ReducerCallError::NoSuchReducer => {
123 log::debug!("Attempt to call non-existent reducer {reducer}");
124 StatusCode::NOT_FOUND
125 }
126 ReducerCallError::LifecycleReducer(lifecycle) => {
127 log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
128 StatusCode::BAD_REQUEST
129 }
130 };
131
132 log::debug!("Error while invoking reducer {e:#}");
133 Err((status_code, format!("{:#}", anyhow::anyhow!(e))))
134 }
135 };
136
137 if let Err(e) = module.call_identity_disconnected(caller_identity, connection_id).await {
138 return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(e))).into());
143 }
144
145 match result {
146 Ok(result) => {
147 let (status, body) = reducer_outcome_response(&identity, &reducer, result.outcome);
148 Ok((
149 status,
150 TypedHeader(SpacetimeEnergyUsed(result.energy_used)),
151 TypedHeader(SpacetimeExecutionDurationMicros(result.execution_duration)),
152 body,
153 ))
154 }
155 Err(e) => Err((e.0, e.1).into()),
156 }
157}
158
159fn reducer_outcome_response(identity: &Identity, reducer: &str, outcome: ReducerOutcome) -> (StatusCode, String) {
160 match outcome {
161 ReducerOutcome::Committed => (StatusCode::OK, "".to_owned()),
162 ReducerOutcome::Failed(errmsg) => {
163 (StatusCode::from_u16(530).unwrap(), errmsg)
165 }
166 ReducerOutcome::BudgetExceeded => {
167 log::warn!("Node's energy budget exceeded for identity: {identity} while executing {reducer}");
168 (
169 StatusCode::PAYMENT_REQUIRED,
170 "Module energy budget exhausted.".to_owned(),
171 )
172 }
173 }
174}
175
176#[derive(Debug, derive_more::From)]
177pub enum DBCallErr {
178 HandlerError(ErrorResponse),
179 NoSuchDatabase,
180 InstanceNotScheduled,
181}
182
183#[derive(Deserialize)]
184pub struct SchemaParams {
185 name_or_identity: NameOrIdentity,
186}
187#[derive(Deserialize)]
188pub struct SchemaQueryParams {
189 version: SchemaVersion,
190}
191
192#[derive(Deserialize)]
193enum SchemaVersion {
194 #[serde(rename = "9")]
195 V9,
196}
197
198pub async fn schema<S>(
199 State(worker_ctx): State<S>,
200 Path(SchemaParams { name_or_identity }): Path<SchemaParams>,
201 Query(SchemaQueryParams { version }): Query<SchemaQueryParams>,
202 Extension(auth): Extension<SpacetimeAuth>,
203) -> axum::response::Result<impl IntoResponse>
204where
205 S: ControlStateDelegate + NodeDelegate,
206{
207 let db_identity = name_or_identity.resolve(&worker_ctx).await?;
208 let database = worker_ctx_find_database(&worker_ctx, &db_identity)
209 .await?
210 .ok_or(NO_SUCH_DATABASE)?;
211
212 let leader = worker_ctx
213 .leader(database.id)
214 .await
215 .map_err(log_and_500)?
216 .ok_or(StatusCode::NOT_FOUND)?;
217 let module = leader.module().await.map_err(log_and_500)?;
218
219 let module_def = &module.info.module_def;
220 let response_json = match version {
221 SchemaVersion::V9 => {
222 let raw = RawModuleDefV9::from(module_def.clone());
223 axum::Json(sats::serde::SerdeWrapper(raw)).into_response()
224 }
225 };
226
227 Ok((
228 TypedHeader(SpacetimeIdentity(auth.identity)),
229 TypedHeader(SpacetimeIdentityToken(auth.creds)),
230 response_json,
231 ))
232}
233
234#[derive(Deserialize)]
235pub struct DatabaseParam {
236 name_or_identity: NameOrIdentity,
237}
238
239#[derive(sats::Serialize)]
240struct DatabaseResponse {
241 database_identity: Identity,
242 owner_identity: Identity,
243 host_type: HostType,
244 initial_program: spacetimedb_lib::Hash,
245}
246
247impl From<Database> for DatabaseResponse {
248 fn from(db: Database) -> Self {
249 DatabaseResponse {
250 database_identity: db.database_identity,
251 owner_identity: db.owner_identity,
252 host_type: db.host_type,
253 initial_program: db.initial_program,
254 }
255 }
256}
257
258pub async fn db_info<S: ControlStateDelegate>(
259 State(worker_ctx): State<S>,
260 Path(DatabaseParam { name_or_identity }): Path<DatabaseParam>,
261) -> axum::response::Result<impl IntoResponse> {
262 log::trace!("Trying to resolve database identity: {name_or_identity:?}");
263 let database_identity = name_or_identity.resolve(&worker_ctx).await?;
264 log::trace!("Resolved identity to: {database_identity:?}");
265 let database = worker_ctx_find_database(&worker_ctx, &database_identity)
266 .await?
267 .ok_or(NO_SUCH_DATABASE)?;
268 log::trace!("Fetched database from the worker db for database identity: {database_identity:?}");
269
270 let response = DatabaseResponse::from(database);
271 Ok(axum::Json(sats::serde::SerdeWrapper(response)))
272}
273
274#[derive(Deserialize)]
275pub struct LogsParams {
276 name_or_identity: NameOrIdentity,
277}
278
279#[derive(Deserialize)]
280pub struct LogsQuery {
281 num_lines: Option<u32>,
282 #[serde(default)]
283 follow: bool,
284}
285
286pub async fn logs<S>(
287 State(worker_ctx): State<S>,
288 Path(LogsParams { name_or_identity }): Path<LogsParams>,
289 Query(LogsQuery { num_lines, follow }): Query<LogsQuery>,
290 Extension(auth): Extension<SpacetimeAuth>,
291) -> axum::response::Result<impl IntoResponse>
292where
293 S: ControlStateDelegate + NodeDelegate,
294{
295 let database_identity: Identity = name_or_identity.resolve(&worker_ctx).await?;
299 let database = worker_ctx_find_database(&worker_ctx, &database_identity)
300 .await?
301 .ok_or(NO_SUCH_DATABASE)?;
302
303 if database.owner_identity != auth.identity {
304 return Err((
305 StatusCode::BAD_REQUEST,
306 format!(
307 "Identity does not own database, expected: {} got: {}",
308 database.owner_identity.to_hex(),
309 auth.identity.to_hex()
310 ),
311 )
312 .into());
313 }
314
315 let replica = worker_ctx
316 .get_leader_replica_by_database(database.id)
317 .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?;
318 let replica_id = replica.id;
319
320 let logs_dir = worker_ctx.module_logs_dir(replica_id);
321 let lines = DatabaseLogger::read_latest(logs_dir, num_lines).await;
322
323 let body = if follow {
324 let leader = worker_ctx
325 .leader(database.id)
326 .await
327 .map_err(log_and_500)?
328 .ok_or(StatusCode::NOT_FOUND)?;
329 let log_rx = leader
330 .module()
331 .await
332 .map_err(log_and_500)?
333 .subscribe_to_logs()
334 .map_err(log_and_500)?;
335
336 let stream = tokio_stream::wrappers::BroadcastStream::new(log_rx).filter_map(move |x| {
337 std::future::ready(match x {
338 Ok(log) => Some(log),
339 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(skipped)) => {
340 log::trace!(
341 "Skipped {} lines in log for module {}",
342 skipped,
343 database_identity.to_hex()
344 );
345 None
346 }
347 })
348 });
349
350 let stream = futures::stream::once(std::future::ready(lines.into()))
351 .chain(stream)
352 .map(Ok::<_, std::convert::Infallible>);
353
354 Body::from_stream(stream)
355 } else {
356 Body::from(lines)
357 };
358
359 Ok((
360 TypedHeader(headers::CacheControl::new().with_no_cache()),
361 TypedHeader(headers::ContentType::from(mime_ndjson())),
362 body,
363 ))
364}
365
366fn mime_ndjson() -> mime::Mime {
367 "application/x-ndjson".parse().unwrap()
368}
369
370pub(crate) async fn worker_ctx_find_database(
371 worker_ctx: &(impl ControlStateDelegate + ?Sized),
372 database_identity: &Identity,
373) -> axum::response::Result<Option<Database>> {
374 worker_ctx
375 .get_database_by_identity(database_identity)
376 .map_err(log_and_500)
377}
378
379#[derive(Deserialize)]
380pub struct SqlParams {
381 name_or_identity: NameOrIdentity,
382}
383
384#[derive(Deserialize)]
385pub struct SqlQueryParams {}
386
387pub async fn sql<S>(
388 State(worker_ctx): State<S>,
389 Path(SqlParams { name_or_identity }): Path<SqlParams>,
390 Query(SqlQueryParams {}): Query<SqlQueryParams>,
391 Extension(auth): Extension<SpacetimeAuth>,
392 body: String,
393) -> axum::response::Result<impl IntoResponse>
394where
395 S: NodeDelegate + ControlStateDelegate,
396{
397 let db_identity = name_or_identity.resolve(&worker_ctx).await?;
401 let database = worker_ctx_find_database(&worker_ctx, &db_identity)
402 .await?
403 .ok_or(NO_SUCH_DATABASE)?;
404
405 let auth = AuthCtx::new(database.owner_identity, auth.identity);
406 log::debug!("auth: {auth:?}");
407
408 let host = worker_ctx
409 .leader(database.id)
410 .await
411 .map_err(log_and_500)?
412 .ok_or(StatusCode::NOT_FOUND)?;
413 let json = host.exec_sql(auth, database, body).await?;
414
415 let total_duration = json.iter().fold(0, |acc, x| acc + x.total_duration_micros);
416
417 Ok((
418 TypedHeader(SpacetimeExecutionDurationMicros(Duration::from_micros(total_duration))),
419 axum::Json(json),
420 ))
421}
422
423#[derive(Deserialize)]
424pub struct DNSParams {
425 name_or_identity: NameOrIdentity,
426}
427
428#[derive(Deserialize)]
429pub struct ReverseDNSParams {
430 name_or_identity: NameOrIdentity,
431}
432
433#[derive(Deserialize)]
434pub struct DNSQueryParams {}
435
436pub async fn get_identity<S: ControlStateDelegate>(
437 State(ctx): State<S>,
438 Path(DNSParams { name_or_identity }): Path<DNSParams>,
439 Query(DNSQueryParams {}): Query<DNSQueryParams>,
440) -> axum::response::Result<impl IntoResponse> {
441 let identity = name_or_identity.resolve(&ctx).await?;
442 Ok(identity.to_string())
443}
444
445pub async fn get_names<S: ControlStateDelegate>(
446 State(ctx): State<S>,
447 Path(ReverseDNSParams { name_or_identity }): Path<ReverseDNSParams>,
448) -> axum::response::Result<impl IntoResponse> {
449 let database_identity = name_or_identity.resolve(&ctx).await?;
450
451 let names = ctx
452 .reverse_lookup(&database_identity)
453 .map_err(log_and_500)?
454 .into_iter()
455 .filter_map(|x| String::from(x).try_into().ok())
456 .collect();
457
458 let response = name::GetNamesResponse { names };
459 Ok(axum::Json(response))
460}
461
462#[derive(Deserialize)]
463pub struct PublishDatabaseParams {
464 name_or_identity: Option<NameOrIdentity>,
465}
466
467#[derive(Deserialize)]
468pub struct PublishDatabaseQueryParams {
469 #[serde(default)]
470 clear: bool,
471 num_replicas: Option<usize>,
472}
473
474use std::env;
475fn require_spacetime_auth_for_creation() -> bool {
476 env::var("TEMP_REQUIRE_SPACETIME_AUTH").is_ok_and(|v| !v.is_empty())
477}
478
479fn allow_creation(auth: &SpacetimeAuth) -> Result<(), ErrorResponse> {
481 if !require_spacetime_auth_for_creation() {
482 return Ok(());
483 }
484 if auth.issuer.trim_end_matches('/') == "https://auth.spacetimedb.com" {
485 Ok(())
486 } else {
487 log::trace!("Rejecting creation request because auth issuer is {}", auth.issuer);
488 Err((
489 StatusCode::UNAUTHORIZED,
490 "To create a database, you must be logged in with a SpacetimeDB account.",
491 )
492 .into())
493 }
494}
495
496pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
497 State(ctx): State<S>,
498 Path(PublishDatabaseParams { name_or_identity }): Path<PublishDatabaseParams>,
499 Query(PublishDatabaseQueryParams { clear, num_replicas }): Query<PublishDatabaseQueryParams>,
500 Extension(auth): Extension<SpacetimeAuth>,
501 body: Bytes,
502) -> axum::response::Result<axum::Json<PublishResult>> {
503 let (database_identity, db_name) = match &name_or_identity {
507 Some(noa) => match noa.try_resolve(&ctx).await? {
508 Ok(resolved) => (resolved, noa.name()),
509 Err(name) => {
510 allow_creation(&auth)?;
513 let database_auth = SpacetimeAuth::alloc(&ctx).await?;
514 let database_identity = database_auth.identity;
515 let tld: name::Tld = name.clone().into();
516 let tld = match ctx.register_tld(&auth.identity, tld).await.map_err(log_and_500)? {
517 name::RegisterTldResult::Success { domain }
518 | name::RegisterTldResult::AlreadyRegistered { domain } => domain,
519 name::RegisterTldResult::Unauthorized { .. } => {
520 return Err((
521 StatusCode::UNAUTHORIZED,
522 axum::Json(PublishResult::PermissionDenied { name: name.clone() }),
523 )
524 .into())
525 }
526 };
527 let res = ctx
528 .create_dns_record(&auth.identity, &tld.into(), &database_identity)
529 .await
530 .map_err(log_and_500)?;
531 match res {
532 name::InsertDomainResult::Success { .. } => {}
533 name::InsertDomainResult::TldNotRegistered { .. }
534 | name::InsertDomainResult::PermissionDenied { .. } => {
535 return Err(log_and_500("impossible: we just registered the tld"))
536 }
537 name::InsertDomainResult::OtherError(e) => return Err(log_and_500(e)),
538 }
539 (database_identity, Some(name))
540 }
541 },
542 None => {
543 let database_auth = SpacetimeAuth::alloc(&ctx).await?;
544 let database_identity = database_auth.identity;
545 (database_identity, None)
546 }
547 };
548
549 log::trace!("Publishing to the identity: {}", database_identity.to_hex());
550
551 let op = {
552 let exists = ctx
553 .get_database_by_identity(&database_identity)
554 .map_err(log_and_500)?
555 .is_some();
556 if !exists {
557 allow_creation(&auth)?;
558 }
559
560 if clear && exists {
561 ctx.delete_database(&auth.identity, &database_identity)
562 .await
563 .map_err(log_and_500)?;
564 }
565
566 if exists {
567 PublishOp::Updated
568 } else {
569 PublishOp::Created
570 }
571 };
572
573 let num_replicas = num_replicas
574 .map(|n| {
575 let n = u8::try_from(n).map_err(|_| (StatusCode::BAD_REQUEST, "Replication factor {n} out of bounds"))?;
576 Ok::<_, ErrorResponse>(NonZeroU8::new(n))
577 })
578 .transpose()?
579 .flatten();
580
581 let maybe_updated = ctx
582 .publish_database(
583 &auth.identity,
584 DatabaseDef {
585 database_identity,
586 program_bytes: body.into(),
587 num_replicas,
588 host_type: HostType::Wasm,
589 },
590 )
591 .await
592 .map_err(log_and_500)?;
593
594 if let Some(updated) = maybe_updated {
595 match updated {
596 UpdateDatabaseResult::AutoMigrateError(errs) => {
597 return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {errs}")).into());
598 }
599 UpdateDatabaseResult::ErrorExecutingMigration(err) => {
600 return Err((
601 StatusCode::BAD_REQUEST,
602 format!("Failed to create or update the database: {err}"),
603 )
604 .into());
605 }
606 UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {}
607 }
608 }
609
610 Ok(axum::Json(PublishResult::Success {
611 domain: db_name.cloned(),
612 database_identity,
613 op,
614 }))
615}
616
617#[derive(Deserialize)]
618pub struct DeleteDatabaseParams {
619 name_or_identity: NameOrIdentity,
620}
621
622pub async fn delete_database<S: ControlStateDelegate>(
623 State(ctx): State<S>,
624 Path(DeleteDatabaseParams { name_or_identity }): Path<DeleteDatabaseParams>,
625 Extension(auth): Extension<SpacetimeAuth>,
626) -> axum::response::Result<impl IntoResponse> {
627 let database_identity = name_or_identity.resolve(&ctx).await?;
628
629 ctx.delete_database(&auth.identity, &database_identity)
630 .await
631 .map_err(log_and_500)?;
632
633 Ok(())
634}
635
636#[derive(Deserialize)]
637pub struct AddNameParams {
638 name_or_identity: NameOrIdentity,
639}
640
641pub async fn add_name<S: ControlStateDelegate>(
642 State(ctx): State<S>,
643 Path(AddNameParams { name_or_identity }): Path<AddNameParams>,
644 Extension(auth): Extension<SpacetimeAuth>,
645 name: String,
646) -> axum::response::Result<impl IntoResponse> {
647 let name = DatabaseName::try_from(name).map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
648 let database_identity = name_or_identity.resolve(&ctx).await?;
649
650 let response = ctx
651 .create_dns_record(&auth.identity, &name.into(), &database_identity)
652 .await
653 .map_err(log_and_500)?;
655
656 let code = match response {
657 name::InsertDomainResult::Success { .. } => StatusCode::OK,
658 name::InsertDomainResult::TldNotRegistered { .. } => StatusCode::BAD_REQUEST,
659 name::InsertDomainResult::PermissionDenied { .. } => StatusCode::UNAUTHORIZED,
660 name::InsertDomainResult::OtherError(_) => StatusCode::INTERNAL_SERVER_ERROR,
661 };
662
663 Ok((code, axum::Json(response)))
664}
665
666#[derive(Deserialize)]
667pub struct SetNamesParams {
668 name_or_identity: NameOrIdentity,
669}
670
671pub async fn set_names<S: ControlStateDelegate>(
672 State(ctx): State<S>,
673 Path(SetNamesParams { name_or_identity }): Path<SetNamesParams>,
674 Extension(auth): Extension<SpacetimeAuth>,
675 names: axum::Json<Vec<String>>,
676) -> axum::response::Result<impl IntoResponse> {
677 let validated_names = names
678 .0
679 .into_iter()
680 .map(|s| DatabaseName::from_str(&s).map(DomainName::from).map_err(|e| (s, e)))
681 .collect::<Result<Vec<_>, _>>()
682 .map_err(|(input, e)| (StatusCode::BAD_REQUEST, format!("Error parsing `{input}`: {e}")))?;
683
684 let database_identity = name_or_identity.resolve(&ctx).await?;
685
686 let database = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?;
687 let Some(database) = database else {
688 return Ok((
689 StatusCode::NOT_FOUND,
690 axum::Json(name::SetDomainsResult::DatabaseNotFound),
691 ));
692 };
693
694 if database.owner_identity != auth.identity {
695 return Ok((
696 StatusCode::UNAUTHORIZED,
697 axum::Json(name::SetDomainsResult::NotYourDatabase {
698 database: database.database_identity,
699 }),
700 ));
701 }
702
703 for name in &validated_names {
704 if ctx.lookup_identity(name.as_str()).unwrap().is_some() {
705 return Ok((
706 StatusCode::BAD_REQUEST,
707 axum::Json(name::SetDomainsResult::OtherError(format!(
708 "Cannot rename to {} because it already is in use.",
709 name.as_str()
710 ))),
711 ));
712 }
713 }
714
715 let response = ctx
716 .replace_dns_records(&database_identity, &database.owner_identity, &validated_names)
717 .await
718 .map_err(log_and_500)?;
719 let status = match response {
720 name::SetDomainsResult::Success => StatusCode::OK,
721 name::SetDomainsResult::PermissionDenied { .. }
722 | name::SetDomainsResult::PermissionDeniedOnAny { .. }
723 | name::SetDomainsResult::NotYourDatabase { .. } => StatusCode::UNAUTHORIZED,
724 name::SetDomainsResult::DatabaseNotFound => StatusCode::NOT_FOUND,
725 name::SetDomainsResult::OtherError(_) => StatusCode::INTERNAL_SERVER_ERROR,
726 };
727
728 Ok((status, axum::Json(response)))
729}
730
731#[derive(serde::Deserialize)]
732pub struct TimestampParams {
733 name_or_identity: NameOrIdentity,
734}
735
736async fn get_timestamp<S: ControlStateDelegate>(
743 State(worker_ctx): State<S>,
744 Path(TimestampParams { name_or_identity }): Path<TimestampParams>,
745) -> axum::response::Result<impl IntoResponse> {
746 let db_identity = name_or_identity.resolve(&worker_ctx).await?;
747
748 let _database = worker_ctx_find_database(&worker_ctx, &db_identity)
749 .await?
750 .ok_or_else(|| {
751 log::error!("Could not find database: {}", db_identity.to_hex());
752 NO_SUCH_DATABASE
753 })?;
754
755 Ok(axum::Json(sats::serde::SerdeWrapper(Timestamp::now())).into_response())
756}
757
758pub struct DatabaseRoutes<S> {
760 pub root_post: MethodRouter<S>,
762 pub db_put: MethodRouter<S>,
764 pub db_get: MethodRouter<S>,
766 pub db_delete: MethodRouter<S>,
768 pub names_get: MethodRouter<S>,
770 pub names_post: MethodRouter<S>,
772 pub names_put: MethodRouter<S>,
774 pub identity_get: MethodRouter<S>,
776 pub subscribe_get: MethodRouter<S>,
778 pub call_reducer_post: MethodRouter<S>,
780 pub schema_get: MethodRouter<S>,
782 pub logs_get: MethodRouter<S>,
784 pub sql_post: MethodRouter<S>,
786
787 pub timestamp_get: MethodRouter<S>,
789}
790
791impl<S> Default for DatabaseRoutes<S>
792where
793 S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions + Clone + 'static,
794{
795 fn default() -> Self {
796 use axum::routing::{delete, get, post, put};
797 Self {
798 root_post: post(publish::<S>),
799 db_put: put(publish::<S>),
800 db_get: get(db_info::<S>),
801 db_delete: delete(delete_database::<S>),
802 names_get: get(get_names::<S>),
803 names_post: post(add_name::<S>),
804 names_put: put(set_names::<S>),
805 identity_get: get(get_identity::<S>),
806 subscribe_get: get(handle_websocket::<S>),
807 call_reducer_post: post(call::<S>),
808 schema_get: get(schema::<S>),
809 logs_get: get(logs::<S>),
810 sql_post: post(sql::<S>),
811 timestamp_get: get(get_timestamp::<S>),
812 }
813 }
814}
815
816impl<S> DatabaseRoutes<S>
817where
818 S: NodeDelegate + ControlStateDelegate + Clone + 'static,
819{
820 pub fn into_router(self, ctx: S) -> axum::Router<S> {
821 let db_router = axum::Router::<S>::new()
822 .route("/", self.db_put)
823 .route("/", self.db_get)
824 .route("/", self.db_delete)
825 .route("/names", self.names_get)
826 .route("/names", self.names_post)
827 .route("/names", self.names_put)
828 .route("/identity", self.identity_get)
829 .route("/subscribe", self.subscribe_get)
830 .route("/call/:reducer", self.call_reducer_post)
831 .route("/schema", self.schema_get)
832 .route("/logs", self.logs_get)
833 .route("/sql", self.sql_post)
834 .route("/unstable/timestamp", self.timestamp_get);
835
836 axum::Router::new()
837 .route("/", self.root_post)
838 .nest("/:name_or_identity", db_router)
839 .route_layer(axum::middleware::from_fn_with_state(ctx, anon_auth_middleware::<S>))
840 }
841}