1mod control_db;
2pub mod subcommands;
3pub mod util;
4pub mod version;
5
6use crate::control_db::ControlDb;
7use crate::subcommands::{extract_schema, start};
8use anyhow::{ensure, Context, Ok};
9use async_trait::async_trait;
10use clap::{ArgMatches, Command};
11use spacetimedb::client::ClientActorIndex;
12use spacetimedb::config::{CertificateAuthority, MetadataFile};
13use spacetimedb::db::{self, relational_db};
14use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor};
15use spacetimedb::host::{
16 DiskStorage, DurabilityProvider, ExternalDurability, HostController, StartSnapshotWatcher, UpdateDatabaseResult,
17};
18use spacetimedb::identity::Identity;
19use spacetimedb::messages::control_db::{Database, Node, Replica};
20use spacetimedb::util::jobs::JobCores;
21use spacetimedb::worker_metrics::WORKER_METRICS;
22use spacetimedb_client_api::auth::{self, LOCALHOST};
23use spacetimedb_client_api::routes::subscribe::{HasWebSocketOptions, WebSocketOptions};
24use spacetimedb_client_api::{Host, NodeDelegate};
25use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld};
26use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
27use spacetimedb_datastore::db_metrics::DB_METRICS;
28use spacetimedb_datastore::traits::Program;
29use spacetimedb_paths::server::{ModuleLogsDir, PidFile, ServerDataDir};
30use spacetimedb_paths::standalone::StandaloneDataDirExt;
31use spacetimedb_table::page_pool::PagePool;
32use std::sync::Arc;
33
34pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL};
35
36#[derive(Clone, Copy)]
37pub struct StandaloneOptions {
38 pub db_config: db::Config,
39 pub websocket: WebSocketOptions,
40}
41
42pub struct StandaloneEnv {
43 control_db: ControlDb,
44 program_store: Arc<DiskStorage>,
45 host_controller: HostController,
46 client_actor_index: ClientActorIndex,
47 metrics_registry: prometheus::Registry,
48 _pid_file: PidFile,
49 auth_provider: auth::DefaultJwtAuthProvider,
50 websocket_options: WebSocketOptions,
51}
52
53impl StandaloneEnv {
54 pub async fn init(
55 config: StandaloneOptions,
56 certs: &CertificateAuthority,
57 data_dir: Arc<ServerDataDir>,
58 db_cores: JobCores,
59 ) -> anyhow::Result<Arc<Self>> {
60 let _pid_file = data_dir.pid_file()?;
61 let meta_path = data_dir.metadata_toml();
62 let mut meta = MetadataFile::new("standalone");
63 if let Some(existing_meta) = MetadataFile::read(&meta_path).context("failed reading metadata.toml")? {
64 meta = existing_meta.check_compatibility_and_update(meta)?;
65 }
66 meta.write(&meta_path).context("failed writing metadata.toml")?;
67
68 let control_db = ControlDb::new(&data_dir.control_db()).context("failed to initialize control db")?;
69 let energy_monitor = Arc::new(NullEnergyMonitor);
70 let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?);
71
72 let durability_provider = Arc::new(StandaloneDurabilityProvider {
73 data_dir: data_dir.clone(),
74 });
75 let host_controller = HostController::new(
76 data_dir,
77 config.db_config,
78 program_store.clone(),
79 energy_monitor,
80 durability_provider,
81 db_cores,
82 );
83 let client_actor_index = ClientActorIndex::new();
84 let jwt_keys = certs.get_or_create_keys()?;
85
86 let auth_env = auth::default_auth_environment(jwt_keys, LOCALHOST.to_owned());
87
88 let metrics_registry = prometheus::Registry::new();
89 metrics_registry.register(Box::new(&*WORKER_METRICS)).unwrap();
90 metrics_registry.register(Box::new(&*DB_METRICS)).unwrap();
91 metrics_registry.register(Box::new(&*DATA_SIZE_METRICS)).unwrap();
92
93 Ok(Arc::new(Self {
94 control_db,
95 program_store,
96 host_controller,
97 client_actor_index,
98 metrics_registry,
99 _pid_file,
100 auth_provider: auth_env,
101 websocket_options: config.websocket,
102 }))
103 }
104
105 pub fn data_dir(&self) -> &Arc<ServerDataDir> {
106 &self.host_controller.data_dir
107 }
108
109 pub fn page_pool(&self) -> &PagePool {
110 &self.host_controller.page_pool
111 }
112}
113
114struct StandaloneDurabilityProvider {
115 data_dir: Arc<ServerDataDir>,
116}
117
118#[async_trait]
119impl DurabilityProvider for StandaloneDurabilityProvider {
120 async fn durability(&self, replica_id: u64) -> anyhow::Result<(ExternalDurability, Option<StartSnapshotWatcher>)> {
121 let commitlog_dir = self.data_dir.replica(replica_id).commit_log();
122 let (durability, disk_size) = relational_db::local_durability(commitlog_dir).await?;
123 let start_snapshot_watcher = {
124 let durability = durability.clone();
125 |snapshot_rx| {
126 tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
127 snapshot_rx,
128 durability,
129 ));
130 }
131 };
132 Ok(((durability, disk_size), Some(Box::new(start_snapshot_watcher))))
133 }
134}
135
136#[async_trait]
137impl NodeDelegate for StandaloneEnv {
138 fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
139 self.metrics_registry.gather()
140 }
141
142 fn client_actor_index(&self) -> &ClientActorIndex {
143 &self.client_actor_index
144 }
145
146 type JwtAuthProviderT = auth::DefaultJwtAuthProvider;
147
148 fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT {
149 &self.auth_provider
150 }
151
152 async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
153 let leader = match self.control_db.get_leader_replica_by_database(database_id) {
154 Some(leader) => leader,
155 None => return Ok(None),
156 };
157
158 let database = self
159 .control_db
160 .get_database_by_id(database_id)?
161 .with_context(|| format!("Database {database_id} not found"))?;
162
163 self.host_controller
164 .get_or_launch_module_host(database, leader.id)
165 .await
166 .context("failed to get or launch module host")?;
167
168 Ok(Some(Host::new(leader.id, self.host_controller.clone())))
169 }
170 fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
171 self.data_dir().replica(replica_id).module_logs()
172 }
173}
174
175impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv {
176 fn get_node_id(&self) -> Option<u64> {
178 Some(0)
179 }
180
181 fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
182 if node_id == 0 {
183 return Ok(Some(Node {
184 id: 0,
185 unschedulable: false,
186 advertise_addr: Some("node:80".to_owned()),
187 }));
188 }
189 Ok(None)
190 }
191
192 fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
193 Ok(vec![self.get_node_by_id(0)?.unwrap()])
194 }
195
196 fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
198 Ok(self.control_db.get_database_by_id(id)?)
199 }
200
201 fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>> {
202 Ok(self.control_db.get_database_by_identity(database_identity)?)
203 }
204
205 fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
206 Ok(self.control_db.get_databases()?)
207 }
208
209 fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
211 Ok(self.control_db.get_replica_by_id(id)?)
212 }
213
214 fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
215 Ok(self.control_db.get_replicas()?)
216 }
217
218 fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
219 self.control_db.get_leader_replica_by_database(database_id)
220 }
221 fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
223 Ok(self.control_db.get_energy_balance(identity)?)
224 }
225
226 fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
228 Ok(self.control_db.spacetime_dns(domain)?)
229 }
230
231 fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
232 Ok(self.control_db.spacetime_reverse_dns(database_identity)?)
233 }
234}
235
236#[async_trait]
237impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
238 async fn publish_database(
239 &self,
240 publisher: &Identity,
241 spec: spacetimedb_client_api::DatabaseDef,
242 ) -> anyhow::Result<Option<UpdateDatabaseResult>> {
243 let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?;
244
245 let num_replicas = 1;
247
248 match existing_db {
249 None => {
251 let program = Program::from_bytes(&spec.program_bytes[..]);
252
253 let mut database = Database {
254 id: 0,
255 database_identity: spec.database_identity,
256 owner_identity: *publisher,
257 host_type: spec.host_type,
258 initial_program: program.hash,
259 };
260
261 let _hash_for_assert = program.hash;
262
263 self.host_controller
266 .check_module_validity(database.clone(), program)
267 .await?;
268
269 let program_hash = self.program_store.put(&spec.program_bytes).await?;
270
271 debug_assert_eq!(_hash_for_assert, program_hash);
272
273 let database_id = self.control_db.insert_database(database.clone())?;
274 database.id = database_id;
275
276 self.schedule_replicas(database_id, num_replicas).await?;
277
278 Ok(None)
279 }
280 Some(database) => {
283 ensure!(
284 &database.owner_identity == publisher,
285 "Permission denied: `{}` does not own database `{}`",
286 publisher,
287 spec.database_identity.to_abbreviated_hex()
288 );
289
290 let database_id = database.id;
291 let database_identity = database.database_identity;
292
293 let leader = self
294 .leader(database_id)
295 .await?
296 .ok_or_else(|| anyhow::anyhow!("No leader for database"))?;
297 let update_result = leader
298 .update(database, spec.host_type, spec.program_bytes.into())
299 .await?;
300 if update_result.was_successful() {
301 let replicas = self.control_db.get_replicas_by_database(database_id)?;
302 let desired_replicas = num_replicas as usize;
303 if desired_replicas == 0 {
304 log::info!("Decommissioning all replicas of database {database_identity}");
305 for instance in replicas {
306 self.delete_replica(instance.id).await?;
307 }
308 } else if desired_replicas > replicas.len() {
309 let n = desired_replicas - replicas.len();
310 log::info!(
311 "Scaling up database {} from {} to {} replicas",
312 database_identity,
313 replicas.len(),
314 n
315 );
316 for _ in 0..n {
317 self.insert_replica(Replica {
318 id: 0,
319 database_id,
320 node_id: 0,
321 leader: false,
322 })
323 .await?;
324 }
325 } else if desired_replicas < replicas.len() {
326 let n = replicas.len() - desired_replicas;
327 log::info!(
328 "Scaling down database {} from {} to {} replicas",
329 database_identity,
330 replicas.len(),
331 n
332 );
333 for instance in replicas.into_iter().filter(|instance| !instance.leader).take(n) {
334 self.delete_replica(instance.id).await?;
335 }
336 } else {
337 log::debug!(
338 "Desired replica count {desired_replicas} for database {database_identity} already satisfied"
339 );
340 }
341 }
342
343 anyhow::Ok(Some(update_result))
344 }
345 }
346 }
347
348 async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {
349 let Some(database) = self.control_db.get_database_by_identity(database_identity)? else {
350 return Ok(());
351 };
352 anyhow::ensure!(
353 &database.owner_identity == caller_identity,
354 "Permission denied: `{caller_identity}` does not own database `{}`",
358 database_identity.to_abbreviated_hex()
359 );
360
361 self.control_db.delete_database(database.id)?;
362
363 for instance in self.control_db.get_replicas_by_database(database.id)? {
364 self.delete_replica(instance.id).await?;
365 }
366
367 Ok(())
368 }
369
370 async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()> {
371 let balance = self
372 .control_db
373 .get_energy_balance(identity)?
374 .unwrap_or(EnergyBalance::ZERO);
375
376 let balance = balance.saturating_add_energy(amount);
377
378 self.control_db.set_energy_balance(*identity, balance)?;
379 Ok(())
380 }
381 async fn withdraw_energy(&self, _identity: &Identity, _amount: EnergyQuanta) -> anyhow::Result<()> {
382 Ok(())
384 }
385
386 async fn register_tld(&self, identity: &Identity, tld: Tld) -> anyhow::Result<RegisterTldResult> {
387 Ok(self.control_db.spacetime_register_tld(tld, *identity)?)
388 }
389
390 async fn create_dns_record(
391 &self,
392 owner_identity: &Identity,
393 domain: &DomainName,
394 database_identity: &Identity,
395 ) -> anyhow::Result<InsertDomainResult> {
396 Ok(self
397 .control_db
398 .spacetime_insert_domain(database_identity, domain.clone(), *owner_identity, true)?)
399 }
400
401 async fn replace_dns_records(
402 &self,
403 database_identity: &Identity,
404 owner_identity: &Identity,
405 domain_names: &[DomainName],
406 ) -> anyhow::Result<SetDomainsResult> {
407 Ok(self
408 .control_db
409 .spacetime_replace_domains(database_identity, owner_identity, domain_names)?)
410 }
411}
412
413impl StandaloneEnv {
414 async fn insert_replica(&self, replica: Replica) -> Result<(), anyhow::Error> {
415 let mut new_replica = replica.clone();
416 let id = self.control_db.insert_replica(replica)?;
417 new_replica.id = id;
418
419 self.on_insert_replica(&new_replica).await?;
420
421 Ok(())
422 }
423
424 async fn delete_replica(&self, replica_id: u64) -> Result<(), anyhow::Error> {
425 self.control_db.delete_replica(replica_id)?;
426 self.on_delete_replica(replica_id).await?;
427
428 Ok(())
429 }
430
431 async fn schedule_replicas(&self, database_id: u64, num_replicas: u8) -> Result<(), anyhow::Error> {
432 for i in 0..num_replicas {
434 let replica = Replica {
435 id: 0,
436 database_id,
437 node_id: 0,
438 leader: i == 0,
439 };
440 self.insert_replica(replica).await?;
441 }
442
443 Ok(())
444 }
445
446 async fn on_insert_replica(&self, instance: &Replica) -> Result<(), anyhow::Error> {
447 if instance.leader {
448 let database = self
449 .control_db
450 .get_database_by_id(instance.database_id)?
451 .with_context(|| {
452 format!(
453 "unknown database: id: {}, instance: {}",
454 instance.database_id, instance.id
455 )
456 })?;
457 self.leader(database.id).await?;
458 }
459
460 Ok(())
461 }
462
463 async fn on_delete_replica(&self, replica_id: u64) -> anyhow::Result<()> {
464 self.host_controller.exit_module_host(replica_id).await?;
469
470 Ok(())
471 }
472}
473
474impl HasWebSocketOptions for StandaloneEnv {
475 fn websocket_options(&self) -> WebSocketOptions {
476 self.websocket_options
477 }
478}
479
480pub async fn exec_subcommand(cmd: &str, args: &ArgMatches, db_cores: JobCores) -> Result<(), anyhow::Error> {
481 match cmd {
482 "start" => start::exec(args, db_cores).await,
483 "extract-schema" => extract_schema::exec(args).await,
484 unknown => Err(anyhow::anyhow!("Invalid subcommand: {}", unknown)),
485 }
486}
487
488pub fn get_subcommands() -> Vec<Command> {
489 vec![start::cli(), extract_schema::cli()]
490}
491
492pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path::Path>) -> anyhow::Result<()> {
493 let mut args: Vec<&std::ffi::OsStr> = vec!["start".as_ref(), "--data-dir".as_ref(), data_dir.0.as_os_str()];
494 if let Some(cert_dir) = &cert_dir {
495 args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()])
496 }
497 let args = start::cli().try_get_matches_from(args)?;
498 start::exec(&args, JobCores::default()).await
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use anyhow::Result;
505 use spacetimedb::db::Storage;
506 use spacetimedb_paths::{cli::*, FromPathUnchecked};
507 use std::fs;
508 use tempfile::TempDir;
509
510 #[tokio::test]
511 async fn ensure_init_grabs_lock() -> Result<()> {
512 let tempdir = TempDir::new()?;
513 let keys = tempdir.path().join("keys");
515 let root = tempdir.path().join("data");
516 let data_dir = Arc::new(ServerDataDir::from_path_unchecked(root));
517
518 fs::create_dir(&keys)?;
519 data_dir.create()?;
520
521 let pub_key = PubKeyPath(keys.join("public"));
522 let priv_key = PrivKeyPath(keys.join("private"));
523 let ca = CertificateAuthority {
524 jwt_pub_key_path: pub_key,
525 jwt_priv_key_path: priv_key,
526 };
527
528 ca.get_or_create_keys()?;
530 let config = StandaloneOptions {
531 db_config: db::Config {
532 storage: Storage::Memory,
533 page_pool_max_size: None,
534 },
535 websocket: WebSocketOptions::default(),
536 };
537
538 let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()).await?;
539 assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default())
541 .await
542 .is_err());
543
544 Ok(())
545 }
546}