spacetimedb/
startup.rs

1use crossbeam_queue::ArrayQueue;
2use itertools::Itertools;
3use spacetimedb_paths::server::{ConfigToml, LogsDir};
4use std::path::PathBuf;
5use std::time::Duration;
6use tracing_appender::rolling;
7use tracing_core::LevelFilter;
8use tracing_flame::FlameLayer;
9use tracing_subscriber::fmt::writer::BoxMakeWriter;
10use tracing_subscriber::fmt::writer::MakeWriterExt;
11use tracing_subscriber::layer::SubscriberExt;
12use tracing_subscriber::prelude::*;
13use tracing_subscriber::{reload, EnvFilter};
14
15use crate::config::{ConfigFile, LogConfig};
16use crate::util::jobs::JobCores;
17
18pub use core_affinity::CoreId;
19
20pub struct TracingOptions {
21    pub config: LogConfig,
22    /// Whether or not to periodically reload the log config in the background.
23    pub reload_config: Option<ConfigToml>,
24    /// Whether or not to write logs to disk.
25    pub disk_logging: Option<LogsDir>,
26    /// The edition of this spacetime server.
27    pub edition: String,
28    /// Enables tracy profiling.
29    pub tracy: bool,
30    pub flamegraph: Option<PathBuf>,
31}
32
33impl Default for TracingOptions {
34    fn default() -> Self {
35        Self {
36            config: LogConfig::default(),
37            reload_config: None,
38            disk_logging: None,
39            edition: "standalone".to_owned(),
40            tracy: false,
41            flamegraph: None,
42        }
43    }
44}
45
46pub fn configure_tracing(opts: TracingOptions) {
47    // Use this to change log levels at runtime.
48    // This means you can change the default log level to trace
49    // if you are trying to debug an issue and need more logs on then turn it off
50    // once you are done.
51
52    let timer = tracing_subscriber::fmt::time();
53    let format = tracing_subscriber::fmt::format::Format::default()
54        .with_timer(timer)
55        .with_line_number(true)
56        .with_file(true)
57        .with_target(false)
58        .compact();
59
60    let write_to = if let Some(logs_dir) = opts.disk_logging {
61        let roller = rolling::Builder::new()
62            .filename_prefix(LogsDir::filename_prefix(&opts.edition))
63            .filename_suffix(LogsDir::filename_extension())
64            .build(logs_dir)
65            .unwrap();
66        // TODO: syslog?
67        BoxMakeWriter::new(std::io::stdout.and(roller))
68    } else {
69        BoxMakeWriter::new(std::io::stdout)
70    };
71
72    let fmt_layer = tracing_subscriber::fmt::Layer::default()
73        .with_writer(write_to)
74        .event_format(format);
75
76    let env_filter_layer = conf_to_filter(opts.config);
77
78    let tracy_layer = if opts.tracy {
79        Some(tracing_tracy::TracyLayer::new())
80    } else {
81        None
82    };
83
84    let (flame_guard, flame_layer) = if let Some(flamegraph_path) = opts.flamegraph {
85        let (flame_layer, guard) = FlameLayer::with_file(flamegraph_path).unwrap();
86        let flame_layer = flame_layer.with_file_and_line(false).with_empty_samples(false);
87        (Some(guard), Some(flame_layer))
88    } else {
89        (None, None)
90    };
91
92    // Is important for `tracy_layer` to be before `fmt_layer` to not print ascii codes...
93    let subscriber = tracing_subscriber::Registry::default()
94        .with(tracy_layer)
95        .with(fmt_layer)
96        .with(flame_layer);
97
98    if let Some(conf_file) = opts.reload_config {
99        let (reload_layer, reload_handle) = tracing_subscriber::reload::Layer::new(env_filter_layer);
100        std::thread::spawn(move || reload_config(&conf_file, &reload_handle));
101        subscriber.with(reload_layer).init();
102    } else {
103        subscriber.with(env_filter_layer).init();
104    };
105
106    if let Some(guard) = flame_guard {
107        tokio::spawn(async move {
108            loop {
109                tokio::time::sleep(Duration::from_secs(5)).await;
110                guard.flush().unwrap();
111            }
112        });
113    }
114}
115
116fn conf_to_filter(conf: LogConfig) -> EnvFilter {
117    EnvFilter::builder()
118        .with_default_directive(conf.level.unwrap_or(LevelFilter::ERROR).into())
119        .parse_lossy(conf.directives.join(","))
120}
121
122fn parse_from_file(path: &ConfigToml) -> EnvFilter {
123    let conf = match ConfigFile::read(path) {
124        Ok(Some(conf)) => conf.logs,
125        Ok(None) => LogConfig::default(),
126        #[allow(clippy::disallowed_macros)]
127        Err(e) => {
128            eprintln!("error reading config.toml for logconf reloading: {e}");
129            LogConfig::default()
130        }
131    };
132    conf_to_filter(conf)
133}
134
135const RELOAD_INTERVAL: Duration = Duration::from_secs(5);
136fn reload_config<S>(conf_file: &ConfigToml, reload_handle: &reload::Handle<EnvFilter, S>) {
137    let mut prev_time = conf_file.metadata().and_then(|m| m.modified()).ok();
138    loop {
139        std::thread::sleep(RELOAD_INTERVAL);
140        if let Ok(modified) = conf_file.metadata().and_then(|m| m.modified()) {
141            if prev_time.is_none_or(|prev| modified > prev) {
142                log::info!("reloading log config...");
143                prev_time = Some(modified);
144                if reload_handle.reload(parse_from_file(conf_file)).is_err() {
145                    break;
146                }
147            }
148        }
149    }
150}
151
152/// Divide up the available CPU cores into pools for different purposes.
153///
154/// Use the fields of the returned [`Cores`] value to actually configure
155/// cores to be pinned.
156///
157/// Pinning different subsystems to different threads reduces overhead from
158/// unnecessary context switching.
159///
160/// * Database instances are critical to overall performance, and keeping each
161///   one on only one thread was shown to significantly increase transaction throughput.
162/// * Tokio and Rayon have their own userspace task schedulers, so if the OS
163///   scheduler is trying to schedule threads as well, it's likely to just
164///   cause interference.
165///
166/// Call only once per process. If obtaining the number of cores fails, or if
167/// there are too few cores, this function may return `Cores::default()`, which
168/// performs no thread pinning.
169// TODO: pinning threads might not be desirable on a machine with other
170//       processes running - this should probably be some sort of flag.
171#[must_use]
172pub fn pin_threads() -> Cores {
173    pin_threads_with_reservations(CoreReservations::default())
174}
175
176/// Like [`pin_threads`], but with a custom [`CoreReservations`].
177#[must_use]
178pub fn pin_threads_with_reservations(reservations: CoreReservations) -> Cores {
179    Cores::get(reservations).unwrap_or_default()
180}
181
182/// The desired distribution of available cores to purposes.
183///
184/// Note that, in addition to `reserved`, [`Cores`] reserves two additional
185/// cores for the operating system. That is, the denominator for fractions
186/// given below is `num_cpus - reserved - 2`.
187pub struct CoreReservations {
188    /// Cores to run database instances on.
189    ///
190    /// Default: 1/8
191    pub databases: f64,
192    /// Cores to run tokio worker threads on.
193    ///
194    /// Default: 4/8
195    pub tokio_workers: f64,
196    /// Cores to run rayon threads on.
197    ///
198    /// Default: 1/8
199    pub rayon: f64,
200    /// Cores to reserve for IRQ handling.
201    ///
202    /// This will be the first `n` [`CoreId`]s in the list.
203    /// Only make use of this if you're configuring the machine for IRQ pinning!
204    ///
205    /// Default: 2
206    pub irq: usize,
207    /// Extra reserved cores.
208    ///
209    /// If greater than zero, this many cores will be reserved _before_
210    /// any of the other reservations are made (but after reserving the OS cores).
211    ///
212    /// Default: 0
213    pub reserved: usize,
214}
215
216impl Default for CoreReservations {
217    fn default() -> Self {
218        Self {
219            databases: 1.0 / 8.0,
220            tokio_workers: 4.0 / 8.0,
221            rayon: 1.0 / 8.0,
222            irq: 2,
223            reserved: 0,
224        }
225    }
226}
227
228impl CoreReservations {
229    /// Apply this reservation to an arbitrary list of core ids.
230    ///
231    /// Returns the allocated cores in the order:
232    ///
233    /// - irq
234    /// - reserved
235    /// - databases
236    /// - tokio_workers
237    /// - rayon
238    ///
239    /// Left public for testing and debugging purposes.
240    pub fn apply(&self, cores: &mut Vec<CoreId>) -> [Vec<CoreId>; 5] {
241        let irq = cores.drain(..self.irq).collect_vec();
242        let reserved = cores.drain(..self.reserved).collect_vec();
243
244        let total = cores.len() as f64;
245        let frac = |frac: f64| (total * frac).ceil() as usize;
246        fn claim(cores: &mut Vec<CoreId>, n: usize) -> impl Iterator<Item = CoreId> + '_ {
247            cores.drain(..n.min(cores.len()))
248        }
249
250        let databases = claim(cores, frac(self.databases)).collect_vec();
251        let tokio_workers = claim(cores, frac(self.tokio_workers)).collect_vec();
252        let rayon = claim(cores, frac(self.rayon)).collect_vec();
253
254        [irq, reserved, databases, tokio_workers, rayon]
255    }
256}
257
258/// A type holding cores divvied up into different sets.
259///
260/// Obtained from [`pin_threads()`].
261#[derive(Default)]
262pub struct Cores {
263    /// The cores to run database instances on.
264    pub databases: JobCores,
265    /// The cores to run tokio worker threads on.
266    pub tokio: TokioCores,
267    /// The cores to run rayon threads on.
268    pub rayon: RayonCores,
269    /// Extra cores if a [`CoreReservations`] with `reserved > 0` was used.
270    ///
271    /// If `Some`, the boxed array is non-empty.
272    pub reserved: Option<Box<[CoreId]>>,
273    /// Cores shared between tokio runtimes to schedule blocking tasks on.
274    ///
275    /// All remaining cores after [`CoreReservations`] have been made become
276    /// blocking cores.
277    ///
278    /// See `Tokio.blocking` for more context.
279    #[cfg(target_os = "linux")]
280    pub blocking: Option<nix::sched::CpuSet>,
281}
282
283impl Cores {
284    fn get(reservations: CoreReservations) -> Option<Self> {
285        let mut cores = Self::get_core_ids()?;
286
287        let [_irq, reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores);
288
289        let reserved = (!reserved.is_empty()).then(|| reserved.into());
290        let databases = databases.into_iter().collect::<JobCores>();
291        let rayon = RayonCores((!rayon.is_empty()).then_some(rayon));
292
293        // see comment on `TokioCores.blocking`
294        #[cfg(target_os = "linux")]
295        let remaining = cores
296            .into_iter()
297            .try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
298                cpuset.set(core.id).ok()?;
299                Some(cpuset)
300            });
301
302        let tokio = TokioCores {
303            workers: Some(tokio_workers),
304            #[cfg(target_os = "linux")]
305            blocking: remaining,
306        };
307
308        Some(Self {
309            databases,
310            tokio,
311            rayon,
312            reserved,
313            #[cfg(target_os = "linux")]
314            blocking: remaining,
315        })
316    }
317
318    /// Get the cores of the local host, as reported by the operating system.
319    ///
320    /// Returns `None` if `num_cpus` is less than 8.
321    /// If `Some` is returned, the `Vec` is non-empty.
322    pub fn get_core_ids() -> Option<Vec<CoreId>> {
323        let cores = core_affinity::get_core_ids()
324            .filter(|cores| cores.len() >= 10)?
325            .into_iter()
326            .collect_vec();
327
328        (!cores.is_empty()).then_some(cores)
329    }
330}
331
332#[derive(Default)]
333pub struct TokioCores {
334    pub workers: Option<Vec<CoreId>>,
335    // For blocking threads, we don't want to limit them to a specific number
336    // and pin them to their own cores - they're supposed to run concurrently
337    // with each other. However, `core_affinity` doesn't support affinity masks,
338    // so we just use the Linux-specific API, since this is only a slight boost
339    // and we don't care enough about performance on other platforms.
340    #[cfg(target_os = "linux")]
341    pub blocking: Option<nix::sched::CpuSet>,
342}
343
344impl TokioCores {
345    /// Configures `builder` to pin its worker threads to specific cores.
346    pub fn configure(self, builder: &mut tokio::runtime::Builder) {
347        if let Some(cores) = self.workers {
348            builder.worker_threads(cores.len());
349
350            let cores_queue = Box::new(ArrayQueue::new(cores.len()));
351            for core in cores {
352                cores_queue.push(core).unwrap();
353            }
354
355            // `on_thread_start` gets called for both async worker threads and blocking threads,
356            // but the first `worker_threads` threads that tokio spawns are worker threads,
357            // so this ends up working fine
358            builder.on_thread_start(move || {
359                if let Some(core) = cores_queue.pop() {
360                    core_affinity::set_for_current(core);
361                } else {
362                    #[cfg(target_os = "linux")]
363                    if let Some(cpuset) = &self.blocking {
364                        let this = nix::unistd::Pid::from_raw(0);
365                        let _ = nix::sched::sched_setaffinity(this, cpuset);
366                    }
367                }
368            });
369        }
370    }
371}
372
373#[derive(Default)]
374pub struct RayonCores(Option<Vec<CoreId>>);
375
376impl RayonCores {
377    /// Configures a global rayon threadpool, pinning its threads to specific cores.
378    ///
379    /// All rayon threads will be run with `tokio_handle` enetered into.
380    pub fn configure(self, tokio_handle: &tokio::runtime::Handle) {
381        rayon_core::ThreadPoolBuilder::new()
382            .thread_name(|_idx| "rayon-worker".to_string())
383            .spawn_handler(thread_spawn_handler(tokio_handle))
384            .num_threads(self.0.as_ref().map_or(0, |cores| cores.len()))
385            .start_handler(move |i| {
386                if let Some(cores) = &self.0 {
387                    core_affinity::set_for_current(cores[i]);
388                }
389            })
390            .build_global()
391            .unwrap()
392    }
393}
394
395/// A Rayon [spawn_handler](https://docs.rs/rustc-rayon-core/latest/rayon_core/struct.ThreadPoolBuilder.html#method.spawn_handler)
396/// which enters the given Tokio runtime at thread startup,
397/// so that the Rayon workers can send along async channels.
398///
399/// Other than entering the `rt`, this spawn handler behaves identitically to the default Rayon spawn handler,
400/// as documented in
401/// https://docs.rs/rustc-rayon-core/0.5.0/rayon_core/struct.ThreadPoolBuilder.html#method.spawn_handler
402///
403/// Having Rayon threads block on async operations is a code smell.
404/// We need to be careful that the Rayon threads never actually block,
405/// i.e. that every async operation they invoke immediately completes.
406/// I (pgoldman 2024-02-22) believe that our Rayon threads only ever send to unbounded channels,
407/// and therefore never wait.
408fn thread_spawn_handler(
409    rt: &tokio::runtime::Handle,
410) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> + '_ {
411    move |thread| {
412        let rt = rt.clone();
413        let mut builder = std::thread::Builder::new();
414        if let Some(name) = thread.name() {
415            builder = builder.name(name.to_owned());
416        }
417        if let Some(stack_size) = thread.stack_size() {
418            builder = builder.stack_size(stack_size);
419        }
420        builder.spawn(move || {
421            let _rt_guard = rt.enter();
422            thread.run()
423        })?;
424        Ok(())
425    }
426}