1use crossbeam_queue::ArrayQueue;
2use itertools::Itertools;
3use spacetimedb_paths::server::{ConfigToml, LogsDir};
4use std::path::PathBuf;
5use std::time::Duration;
6use tracing_appender::rolling;
7use tracing_core::LevelFilter;
8use tracing_flame::FlameLayer;
9use tracing_subscriber::fmt::writer::BoxMakeWriter;
10use tracing_subscriber::fmt::writer::MakeWriterExt;
11use tracing_subscriber::layer::SubscriberExt;
12use tracing_subscriber::prelude::*;
13use tracing_subscriber::{reload, EnvFilter};
14
15use crate::config::{ConfigFile, LogConfig};
16use crate::util::jobs::JobCores;
17
18pub use core_affinity::CoreId;
19
20pub struct TracingOptions {
21 pub config: LogConfig,
22 pub reload_config: Option<ConfigToml>,
24 pub disk_logging: Option<LogsDir>,
26 pub edition: String,
28 pub tracy: bool,
30 pub flamegraph: Option<PathBuf>,
31}
32
33impl Default for TracingOptions {
34 fn default() -> Self {
35 Self {
36 config: LogConfig::default(),
37 reload_config: None,
38 disk_logging: None,
39 edition: "standalone".to_owned(),
40 tracy: false,
41 flamegraph: None,
42 }
43 }
44}
45
46pub fn configure_tracing(opts: TracingOptions) {
47 let timer = tracing_subscriber::fmt::time();
53 let format = tracing_subscriber::fmt::format::Format::default()
54 .with_timer(timer)
55 .with_line_number(true)
56 .with_file(true)
57 .with_target(false)
58 .compact();
59
60 let write_to = if let Some(logs_dir) = opts.disk_logging {
61 let roller = rolling::Builder::new()
62 .filename_prefix(LogsDir::filename_prefix(&opts.edition))
63 .filename_suffix(LogsDir::filename_extension())
64 .build(logs_dir)
65 .unwrap();
66 BoxMakeWriter::new(std::io::stdout.and(roller))
68 } else {
69 BoxMakeWriter::new(std::io::stdout)
70 };
71
72 let fmt_layer = tracing_subscriber::fmt::Layer::default()
73 .with_writer(write_to)
74 .event_format(format);
75
76 let env_filter_layer = conf_to_filter(opts.config);
77
78 let tracy_layer = if opts.tracy {
79 Some(tracing_tracy::TracyLayer::new())
80 } else {
81 None
82 };
83
84 let (flame_guard, flame_layer) = if let Some(flamegraph_path) = opts.flamegraph {
85 let (flame_layer, guard) = FlameLayer::with_file(flamegraph_path).unwrap();
86 let flame_layer = flame_layer.with_file_and_line(false).with_empty_samples(false);
87 (Some(guard), Some(flame_layer))
88 } else {
89 (None, None)
90 };
91
92 let subscriber = tracing_subscriber::Registry::default()
94 .with(tracy_layer)
95 .with(fmt_layer)
96 .with(flame_layer);
97
98 if let Some(conf_file) = opts.reload_config {
99 let (reload_layer, reload_handle) = tracing_subscriber::reload::Layer::new(env_filter_layer);
100 std::thread::spawn(move || reload_config(&conf_file, &reload_handle));
101 subscriber.with(reload_layer).init();
102 } else {
103 subscriber.with(env_filter_layer).init();
104 };
105
106 if let Some(guard) = flame_guard {
107 tokio::spawn(async move {
108 loop {
109 tokio::time::sleep(Duration::from_secs(5)).await;
110 guard.flush().unwrap();
111 }
112 });
113 }
114}
115
116fn conf_to_filter(conf: LogConfig) -> EnvFilter {
117 EnvFilter::builder()
118 .with_default_directive(conf.level.unwrap_or(LevelFilter::ERROR).into())
119 .parse_lossy(conf.directives.join(","))
120}
121
122fn parse_from_file(path: &ConfigToml) -> EnvFilter {
123 let conf = match ConfigFile::read(path) {
124 Ok(Some(conf)) => conf.logs,
125 Ok(None) => LogConfig::default(),
126 #[allow(clippy::disallowed_macros)]
127 Err(e) => {
128 eprintln!("error reading config.toml for logconf reloading: {e}");
129 LogConfig::default()
130 }
131 };
132 conf_to_filter(conf)
133}
134
135const RELOAD_INTERVAL: Duration = Duration::from_secs(5);
136fn reload_config<S>(conf_file: &ConfigToml, reload_handle: &reload::Handle<EnvFilter, S>) {
137 let mut prev_time = conf_file.metadata().and_then(|m| m.modified()).ok();
138 loop {
139 std::thread::sleep(RELOAD_INTERVAL);
140 if let Ok(modified) = conf_file.metadata().and_then(|m| m.modified()) {
141 if prev_time.is_none_or(|prev| modified > prev) {
142 log::info!("reloading log config...");
143 prev_time = Some(modified);
144 if reload_handle.reload(parse_from_file(conf_file)).is_err() {
145 break;
146 }
147 }
148 }
149 }
150}
151
152#[must_use]
172pub fn pin_threads() -> Cores {
173 pin_threads_with_reservations(CoreReservations::default())
174}
175
176#[must_use]
178pub fn pin_threads_with_reservations(reservations: CoreReservations) -> Cores {
179 Cores::get(reservations).unwrap_or_default()
180}
181
182pub struct CoreReservations {
188 pub databases: f64,
192 pub tokio_workers: f64,
196 pub rayon: f64,
200 pub irq: usize,
207 pub reserved: usize,
214}
215
216impl Default for CoreReservations {
217 fn default() -> Self {
218 Self {
219 databases: 1.0 / 8.0,
220 tokio_workers: 4.0 / 8.0,
221 rayon: 1.0 / 8.0,
222 irq: 2,
223 reserved: 0,
224 }
225 }
226}
227
228impl CoreReservations {
229 pub fn apply(&self, cores: &mut Vec<CoreId>) -> [Vec<CoreId>; 5] {
241 let irq = cores.drain(..self.irq).collect_vec();
242 let reserved = cores.drain(..self.reserved).collect_vec();
243
244 let total = cores.len() as f64;
245 let frac = |frac: f64| (total * frac).ceil() as usize;
246 fn claim(cores: &mut Vec<CoreId>, n: usize) -> impl Iterator<Item = CoreId> + '_ {
247 cores.drain(..n.min(cores.len()))
248 }
249
250 let databases = claim(cores, frac(self.databases)).collect_vec();
251 let tokio_workers = claim(cores, frac(self.tokio_workers)).collect_vec();
252 let rayon = claim(cores, frac(self.rayon)).collect_vec();
253
254 [irq, reserved, databases, tokio_workers, rayon]
255 }
256}
257
258#[derive(Default)]
262pub struct Cores {
263 pub databases: JobCores,
265 pub tokio: TokioCores,
267 pub rayon: RayonCores,
269 pub reserved: Option<Box<[CoreId]>>,
273 #[cfg(target_os = "linux")]
280 pub blocking: Option<nix::sched::CpuSet>,
281}
282
283impl Cores {
284 fn get(reservations: CoreReservations) -> Option<Self> {
285 let mut cores = Self::get_core_ids()?;
286
287 let [_irq, reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores);
288
289 let reserved = (!reserved.is_empty()).then(|| reserved.into());
290 let databases = databases.into_iter().collect::<JobCores>();
291 let rayon = RayonCores((!rayon.is_empty()).then_some(rayon));
292
293 #[cfg(target_os = "linux")]
295 let remaining = cores
296 .into_iter()
297 .try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
298 cpuset.set(core.id).ok()?;
299 Some(cpuset)
300 });
301
302 let tokio = TokioCores {
303 workers: Some(tokio_workers),
304 #[cfg(target_os = "linux")]
305 blocking: remaining,
306 };
307
308 Some(Self {
309 databases,
310 tokio,
311 rayon,
312 reserved,
313 #[cfg(target_os = "linux")]
314 blocking: remaining,
315 })
316 }
317
318 pub fn get_core_ids() -> Option<Vec<CoreId>> {
323 let cores = core_affinity::get_core_ids()
324 .filter(|cores| cores.len() >= 10)?
325 .into_iter()
326 .collect_vec();
327
328 (!cores.is_empty()).then_some(cores)
329 }
330}
331
332#[derive(Default)]
333pub struct TokioCores {
334 pub workers: Option<Vec<CoreId>>,
335 #[cfg(target_os = "linux")]
341 pub blocking: Option<nix::sched::CpuSet>,
342}
343
344impl TokioCores {
345 pub fn configure(self, builder: &mut tokio::runtime::Builder) {
347 if let Some(cores) = self.workers {
348 builder.worker_threads(cores.len());
349
350 let cores_queue = Box::new(ArrayQueue::new(cores.len()));
351 for core in cores {
352 cores_queue.push(core).unwrap();
353 }
354
355 builder.on_thread_start(move || {
359 if let Some(core) = cores_queue.pop() {
360 core_affinity::set_for_current(core);
361 } else {
362 #[cfg(target_os = "linux")]
363 if let Some(cpuset) = &self.blocking {
364 let this = nix::unistd::Pid::from_raw(0);
365 let _ = nix::sched::sched_setaffinity(this, cpuset);
366 }
367 }
368 });
369 }
370 }
371}
372
373#[derive(Default)]
374pub struct RayonCores(Option<Vec<CoreId>>);
375
376impl RayonCores {
377 pub fn configure(self, tokio_handle: &tokio::runtime::Handle) {
381 rayon_core::ThreadPoolBuilder::new()
382 .thread_name(|_idx| "rayon-worker".to_string())
383 .spawn_handler(thread_spawn_handler(tokio_handle))
384 .num_threads(self.0.as_ref().map_or(0, |cores| cores.len()))
385 .start_handler(move |i| {
386 if let Some(cores) = &self.0 {
387 core_affinity::set_for_current(cores[i]);
388 }
389 })
390 .build_global()
391 .unwrap()
392 }
393}
394
395fn thread_spawn_handler(
409 rt: &tokio::runtime::Handle,
410) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> + '_ {
411 move |thread| {
412 let rt = rt.clone();
413 let mut builder = std::thread::Builder::new();
414 if let Some(name) = thread.name() {
415 builder = builder.name(name.to_owned());
416 }
417 if let Some(stack_size) = thread.stack_size() {
418 builder = builder.stack_size(stack_size);
419 }
420 builder.spawn(move || {
421 let _rt_guard = rt.enter();
422 thread.run()
423 })?;
424 Ok(())
425 }
426}