spacetimedb_standalone/
lib.rs

1mod control_db;
2pub mod subcommands;
3pub mod util;
4pub mod version;
5
6use crate::control_db::ControlDb;
7use crate::subcommands::{extract_schema, start};
8use anyhow::{ensure, Context, Ok};
9use async_trait::async_trait;
10use clap::{ArgMatches, Command};
11use spacetimedb::client::ClientActorIndex;
12use spacetimedb::config::{CertificateAuthority, MetadataFile};
13use spacetimedb::db::{self, relational_db};
14use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor};
15use spacetimedb::host::{
16    DiskStorage, DurabilityProvider, ExternalDurability, HostController, StartSnapshotWatcher, UpdateDatabaseResult,
17};
18use spacetimedb::identity::Identity;
19use spacetimedb::messages::control_db::{Database, Node, Replica};
20use spacetimedb::util::jobs::JobCores;
21use spacetimedb::worker_metrics::WORKER_METRICS;
22use spacetimedb_client_api::auth::{self, LOCALHOST};
23use spacetimedb_client_api::routes::subscribe::{HasWebSocketOptions, WebSocketOptions};
24use spacetimedb_client_api::{Host, NodeDelegate};
25use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld};
26use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
27use spacetimedb_datastore::db_metrics::DB_METRICS;
28use spacetimedb_datastore::traits::Program;
29use spacetimedb_paths::server::{ModuleLogsDir, PidFile, ServerDataDir};
30use spacetimedb_paths::standalone::StandaloneDataDirExt;
31use spacetimedb_table::page_pool::PagePool;
32use std::sync::Arc;
33
34pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL};
35
36#[derive(Clone, Copy)]
37pub struct StandaloneOptions {
38    pub db_config: db::Config,
39    pub websocket: WebSocketOptions,
40}
41
42pub struct StandaloneEnv {
43    control_db: ControlDb,
44    program_store: Arc<DiskStorage>,
45    host_controller: HostController,
46    client_actor_index: ClientActorIndex,
47    metrics_registry: prometheus::Registry,
48    _pid_file: PidFile,
49    auth_provider: auth::DefaultJwtAuthProvider,
50    websocket_options: WebSocketOptions,
51}
52
53impl StandaloneEnv {
54    pub async fn init(
55        config: StandaloneOptions,
56        certs: &CertificateAuthority,
57        data_dir: Arc<ServerDataDir>,
58        db_cores: JobCores,
59    ) -> anyhow::Result<Arc<Self>> {
60        let _pid_file = data_dir.pid_file()?;
61        let meta_path = data_dir.metadata_toml();
62        let mut meta = MetadataFile::new("standalone");
63        if let Some(existing_meta) = MetadataFile::read(&meta_path).context("failed reading metadata.toml")? {
64            meta = existing_meta.check_compatibility_and_update(meta)?;
65        }
66        meta.write(&meta_path).context("failed writing metadata.toml")?;
67
68        let control_db = ControlDb::new(&data_dir.control_db()).context("failed to initialize control db")?;
69        let energy_monitor = Arc::new(NullEnergyMonitor);
70        let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?);
71
72        let durability_provider = Arc::new(StandaloneDurabilityProvider {
73            data_dir: data_dir.clone(),
74        });
75        let host_controller = HostController::new(
76            data_dir,
77            config.db_config,
78            program_store.clone(),
79            energy_monitor,
80            durability_provider,
81            db_cores,
82        );
83        let client_actor_index = ClientActorIndex::new();
84        let jwt_keys = certs.get_or_create_keys()?;
85
86        let auth_env = auth::default_auth_environment(jwt_keys, LOCALHOST.to_owned());
87
88        let metrics_registry = prometheus::Registry::new();
89        metrics_registry.register(Box::new(&*WORKER_METRICS)).unwrap();
90        metrics_registry.register(Box::new(&*DB_METRICS)).unwrap();
91        metrics_registry.register(Box::new(&*DATA_SIZE_METRICS)).unwrap();
92
93        Ok(Arc::new(Self {
94            control_db,
95            program_store,
96            host_controller,
97            client_actor_index,
98            metrics_registry,
99            _pid_file,
100            auth_provider: auth_env,
101            websocket_options: config.websocket,
102        }))
103    }
104
105    pub fn data_dir(&self) -> &Arc<ServerDataDir> {
106        &self.host_controller.data_dir
107    }
108
109    pub fn page_pool(&self) -> &PagePool {
110        &self.host_controller.page_pool
111    }
112}
113
114struct StandaloneDurabilityProvider {
115    data_dir: Arc<ServerDataDir>,
116}
117
118#[async_trait]
119impl DurabilityProvider for StandaloneDurabilityProvider {
120    async fn durability(&self, replica_id: u64) -> anyhow::Result<(ExternalDurability, Option<StartSnapshotWatcher>)> {
121        let commitlog_dir = self.data_dir.replica(replica_id).commit_log();
122        let (durability, disk_size) = relational_db::local_durability(commitlog_dir).await?;
123        let start_snapshot_watcher = {
124            let durability = durability.clone();
125            |snapshot_rx| {
126                tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
127                    snapshot_rx,
128                    durability,
129                ));
130            }
131        };
132        Ok(((durability, disk_size), Some(Box::new(start_snapshot_watcher))))
133    }
134}
135
136#[async_trait]
137impl NodeDelegate for StandaloneEnv {
138    fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
139        self.metrics_registry.gather()
140    }
141
142    fn client_actor_index(&self) -> &ClientActorIndex {
143        &self.client_actor_index
144    }
145
146    type JwtAuthProviderT = auth::DefaultJwtAuthProvider;
147
148    fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT {
149        &self.auth_provider
150    }
151
152    async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
153        let leader = match self.control_db.get_leader_replica_by_database(database_id) {
154            Some(leader) => leader,
155            None => return Ok(None),
156        };
157
158        let database = self
159            .control_db
160            .get_database_by_id(database_id)?
161            .with_context(|| format!("Database {database_id} not found"))?;
162
163        self.host_controller
164            .get_or_launch_module_host(database, leader.id)
165            .await
166            .context("failed to get or launch module host")?;
167
168        Ok(Some(Host::new(leader.id, self.host_controller.clone())))
169    }
170    fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
171        self.data_dir().replica(replica_id).module_logs()
172    }
173}
174
175impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv {
176    // Nodes
177    fn get_node_id(&self) -> Option<u64> {
178        Some(0)
179    }
180
181    fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
182        if node_id == 0 {
183            return Ok(Some(Node {
184                id: 0,
185                unschedulable: false,
186                advertise_addr: Some("node:80".to_owned()),
187            }));
188        }
189        Ok(None)
190    }
191
192    fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
193        Ok(vec![self.get_node_by_id(0)?.unwrap()])
194    }
195
196    // Databases
197    fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
198        Ok(self.control_db.get_database_by_id(id)?)
199    }
200
201    fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>> {
202        Ok(self.control_db.get_database_by_identity(database_identity)?)
203    }
204
205    fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
206        Ok(self.control_db.get_databases()?)
207    }
208
209    // Replicas
210    fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
211        Ok(self.control_db.get_replica_by_id(id)?)
212    }
213
214    fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
215        Ok(self.control_db.get_replicas()?)
216    }
217
218    fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
219        self.control_db.get_leader_replica_by_database(database_id)
220    }
221    // Energy
222    fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
223        Ok(self.control_db.get_energy_balance(identity)?)
224    }
225
226    // DNS
227    fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
228        Ok(self.control_db.spacetime_dns(domain)?)
229    }
230
231    fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
232        Ok(self.control_db.spacetime_reverse_dns(database_identity)?)
233    }
234}
235
236#[async_trait]
237impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
238    async fn publish_database(
239        &self,
240        publisher: &Identity,
241        spec: spacetimedb_client_api::DatabaseDef,
242    ) -> anyhow::Result<Option<UpdateDatabaseResult>> {
243        let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?;
244
245        // standalone does not support replication.
246        let num_replicas = 1;
247
248        match existing_db {
249            // The database does not already exist, so we'll create it.
250            None => {
251                let program = Program::from_bytes(&spec.program_bytes[..]);
252
253                let mut database = Database {
254                    id: 0,
255                    database_identity: spec.database_identity,
256                    owner_identity: *publisher,
257                    host_type: spec.host_type,
258                    initial_program: program.hash,
259                };
260
261                let _hash_for_assert = program.hash;
262
263                // Instantiate a temporary database in order to check that the module is valid.
264                // This will e.g. typecheck RLS filters.
265                self.host_controller
266                    .check_module_validity(database.clone(), program)
267                    .await?;
268
269                let program_hash = self.program_store.put(&spec.program_bytes).await?;
270
271                debug_assert_eq!(_hash_for_assert, program_hash);
272
273                let database_id = self.control_db.insert_database(database.clone())?;
274                database.id = database_id;
275
276                self.schedule_replicas(database_id, num_replicas).await?;
277
278                Ok(None)
279            }
280            // The database already exists, so we'll try to update it.
281            // If that fails, we'll keep the old one.
282            Some(database) => {
283                ensure!(
284                    &database.owner_identity == publisher,
285                    "Permission denied: `{}` does not own database `{}`",
286                    publisher,
287                    spec.database_identity.to_abbreviated_hex()
288                );
289
290                let database_id = database.id;
291                let database_identity = database.database_identity;
292
293                let leader = self
294                    .leader(database_id)
295                    .await?
296                    .ok_or_else(|| anyhow::anyhow!("No leader for database"))?;
297                let update_result = leader
298                    .update(database, spec.host_type, spec.program_bytes.into())
299                    .await?;
300                if update_result.was_successful() {
301                    let replicas = self.control_db.get_replicas_by_database(database_id)?;
302                    let desired_replicas = num_replicas as usize;
303                    if desired_replicas == 0 {
304                        log::info!("Decommissioning all replicas of database {database_identity}");
305                        for instance in replicas {
306                            self.delete_replica(instance.id).await?;
307                        }
308                    } else if desired_replicas > replicas.len() {
309                        let n = desired_replicas - replicas.len();
310                        log::info!(
311                            "Scaling up database {} from {} to {} replicas",
312                            database_identity,
313                            replicas.len(),
314                            n
315                        );
316                        for _ in 0..n {
317                            self.insert_replica(Replica {
318                                id: 0,
319                                database_id,
320                                node_id: 0,
321                                leader: false,
322                            })
323                            .await?;
324                        }
325                    } else if desired_replicas < replicas.len() {
326                        let n = replicas.len() - desired_replicas;
327                        log::info!(
328                            "Scaling down database {} from {} to {} replicas",
329                            database_identity,
330                            replicas.len(),
331                            n
332                        );
333                        for instance in replicas.into_iter().filter(|instance| !instance.leader).take(n) {
334                            self.delete_replica(instance.id).await?;
335                        }
336                    } else {
337                        log::debug!(
338                            "Desired replica count {desired_replicas} for database {database_identity} already satisfied"
339                        );
340                    }
341                }
342
343                anyhow::Ok(Some(update_result))
344            }
345        }
346    }
347
348    async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {
349        let Some(database) = self.control_db.get_database_by_identity(database_identity)? else {
350            return Ok(());
351        };
352        anyhow::ensure!(
353            &database.owner_identity == caller_identity,
354            // TODO: `PermissionDenied` should be a variant of `Error`,
355            //       so we can match on it and return better error responses
356            //       from HTTP endpoints.
357            "Permission denied: `{caller_identity}` does not own database `{}`",
358            database_identity.to_abbreviated_hex()
359        );
360
361        self.control_db.delete_database(database.id)?;
362
363        for instance in self.control_db.get_replicas_by_database(database.id)? {
364            self.delete_replica(instance.id).await?;
365        }
366
367        Ok(())
368    }
369
370    async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> anyhow::Result<()> {
371        let balance = self
372            .control_db
373            .get_energy_balance(identity)?
374            .unwrap_or(EnergyBalance::ZERO);
375
376        let balance = balance.saturating_add_energy(amount);
377
378        self.control_db.set_energy_balance(*identity, balance)?;
379        Ok(())
380    }
381    async fn withdraw_energy(&self, _identity: &Identity, _amount: EnergyQuanta) -> anyhow::Result<()> {
382        // The energy balance code is obsolete.
383        Ok(())
384    }
385
386    async fn register_tld(&self, identity: &Identity, tld: Tld) -> anyhow::Result<RegisterTldResult> {
387        Ok(self.control_db.spacetime_register_tld(tld, *identity)?)
388    }
389
390    async fn create_dns_record(
391        &self,
392        owner_identity: &Identity,
393        domain: &DomainName,
394        database_identity: &Identity,
395    ) -> anyhow::Result<InsertDomainResult> {
396        Ok(self
397            .control_db
398            .spacetime_insert_domain(database_identity, domain.clone(), *owner_identity, true)?)
399    }
400
401    async fn replace_dns_records(
402        &self,
403        database_identity: &Identity,
404        owner_identity: &Identity,
405        domain_names: &[DomainName],
406    ) -> anyhow::Result<SetDomainsResult> {
407        Ok(self
408            .control_db
409            .spacetime_replace_domains(database_identity, owner_identity, domain_names)?)
410    }
411}
412
413impl StandaloneEnv {
414    async fn insert_replica(&self, replica: Replica) -> Result<(), anyhow::Error> {
415        let mut new_replica = replica.clone();
416        let id = self.control_db.insert_replica(replica)?;
417        new_replica.id = id;
418
419        self.on_insert_replica(&new_replica).await?;
420
421        Ok(())
422    }
423
424    async fn delete_replica(&self, replica_id: u64) -> Result<(), anyhow::Error> {
425        self.control_db.delete_replica(replica_id)?;
426        self.on_delete_replica(replica_id).await?;
427
428        Ok(())
429    }
430
431    async fn schedule_replicas(&self, database_id: u64, num_replicas: u8) -> Result<(), anyhow::Error> {
432        // Just scheduling a bunch of replicas to the only machine
433        for i in 0..num_replicas {
434            let replica = Replica {
435                id: 0,
436                database_id,
437                node_id: 0,
438                leader: i == 0,
439            };
440            self.insert_replica(replica).await?;
441        }
442
443        Ok(())
444    }
445
446    async fn on_insert_replica(&self, instance: &Replica) -> Result<(), anyhow::Error> {
447        if instance.leader {
448            let database = self
449                .control_db
450                .get_database_by_id(instance.database_id)?
451                .with_context(|| {
452                    format!(
453                        "unknown database: id: {}, instance: {}",
454                        instance.database_id, instance.id
455                    )
456                })?;
457            self.leader(database.id).await?;
458        }
459
460        Ok(())
461    }
462
463    async fn on_delete_replica(&self, replica_id: u64) -> anyhow::Result<()> {
464        // TODO(cloutiertyler): We should think about how to clean up
465        // replicas which have been deleted. This will just drop
466        // them from memory, but will not remove them from disk.  We need
467        // some kind of database lifecycle manager long term.
468        self.host_controller.exit_module_host(replica_id).await?;
469
470        Ok(())
471    }
472}
473
474impl HasWebSocketOptions for StandaloneEnv {
475    fn websocket_options(&self) -> WebSocketOptions {
476        self.websocket_options
477    }
478}
479
480pub async fn exec_subcommand(cmd: &str, args: &ArgMatches, db_cores: JobCores) -> Result<(), anyhow::Error> {
481    match cmd {
482        "start" => start::exec(args, db_cores).await,
483        "extract-schema" => extract_schema::exec(args).await,
484        unknown => Err(anyhow::anyhow!("Invalid subcommand: {}", unknown)),
485    }
486}
487
488pub fn get_subcommands() -> Vec<Command> {
489    vec![start::cli(), extract_schema::cli()]
490}
491
492pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path::Path>) -> anyhow::Result<()> {
493    let mut args: Vec<&std::ffi::OsStr> = vec!["start".as_ref(), "--data-dir".as_ref(), data_dir.0.as_os_str()];
494    if let Some(cert_dir) = &cert_dir {
495        args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()])
496    }
497    let args = start::cli().try_get_matches_from(args)?;
498    start::exec(&args, JobCores::default()).await
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use anyhow::Result;
505    use spacetimedb::db::Storage;
506    use spacetimedb_paths::{cli::*, FromPathUnchecked};
507    use std::fs;
508    use tempfile::TempDir;
509
510    #[tokio::test]
511    async fn ensure_init_grabs_lock() -> Result<()> {
512        let tempdir = TempDir::new()?;
513        // Use one subdir for keys and another for the data dir.
514        let keys = tempdir.path().join("keys");
515        let root = tempdir.path().join("data");
516        let data_dir = Arc::new(ServerDataDir::from_path_unchecked(root));
517
518        fs::create_dir(&keys)?;
519        data_dir.create()?;
520
521        let pub_key = PubKeyPath(keys.join("public"));
522        let priv_key = PrivKeyPath(keys.join("private"));
523        let ca = CertificateAuthority {
524            jwt_pub_key_path: pub_key,
525            jwt_priv_key_path: priv_key,
526        };
527
528        // Create the keys.
529        ca.get_or_create_keys()?;
530        let config = StandaloneOptions {
531            db_config: db::Config {
532                storage: Storage::Memory,
533                page_pool_max_size: None,
534            },
535            websocket: WebSocketOptions::default(),
536        };
537
538        let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()).await?;
539        // Ensure that we have a lock.
540        assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default())
541            .await
542            .is_err());
543
544        Ok(())
545    }
546}