spacetimedb/
database_logger.rs

1use chrono::{NaiveDate, Utc};
2use parking_lot::Mutex;
3use std::fs::File;
4use std::io::{self, Read, Seek, Write};
5use tokio::sync::broadcast;
6
7use spacetimedb_paths::server::{ModuleLogPath, ModuleLogsDir};
8
9use crate::util::asyncify;
10
11pub struct DatabaseLogger {
12    inner: Mutex<DatabaseLoggerInner>,
13    pub tx: broadcast::Sender<bytes::Bytes>,
14}
15
16struct DatabaseLoggerInner {
17    file: File,
18    date: NaiveDate,
19    path: ModuleLogPath,
20}
21
22#[derive(Clone, Copy, PartialEq, Eq, Debug, serde::Deserialize)]
23pub enum LogLevel {
24    Error,
25    Warn,
26    Info,
27    Debug,
28    Trace,
29    Panic,
30}
31
32impl From<u8> for LogLevel {
33    fn from(level: u8) -> Self {
34        match level {
35            0 => LogLevel::Error,
36            1 => LogLevel::Warn,
37            2 => LogLevel::Info,
38            3 => LogLevel::Debug,
39            4 => LogLevel::Trace,
40            101 => LogLevel::Panic,
41            _ => LogLevel::Debug,
42        }
43    }
44}
45
46#[serde_with::skip_serializing_none]
47#[serde_with::serde_as]
48#[derive(serde::Serialize, Copy, Clone)]
49pub struct Record<'a> {
50    #[serde_as(as = "serde_with::TimestampMicroSeconds")]
51    pub ts: chrono::DateTime<Utc>,
52    pub target: Option<&'a str>,
53    pub filename: Option<&'a str>,
54    pub line_number: Option<u32>,
55    pub message: &'a str,
56}
57
58pub trait BacktraceProvider {
59    fn capture(&self) -> Box<dyn ModuleBacktrace>;
60}
61
62impl BacktraceProvider for () {
63    fn capture(&self) -> Box<dyn ModuleBacktrace> {
64        Box::new(())
65    }
66}
67
68pub trait ModuleBacktrace {
69    fn frames(&self) -> Vec<BacktraceFrame<'_>>;
70}
71
72impl ModuleBacktrace for () {
73    fn frames(&self) -> Vec<BacktraceFrame<'_>> {
74        vec![]
75    }
76}
77
78#[serde_with::skip_serializing_none]
79#[serde_with::serde_as]
80#[derive(serde::Serialize)]
81pub struct BacktraceFrame<'a> {
82    #[serde_as(as = "Option<DemangleSymbol>")]
83    pub module_name: Option<&'a str>,
84    #[serde_as(as = "Option<DemangleSymbol>")]
85    pub func_name: Option<&'a str>,
86}
87
88struct DemangleSymbol;
89impl serde_with::SerializeAs<&str> for DemangleSymbol {
90    fn serialize_as<S>(source: &&str, serializer: S) -> Result<S::Ok, S::Error>
91    where
92        S: serde::Serializer,
93    {
94        if let Ok(sym) = rustc_demangle::try_demangle(source) {
95            serializer.serialize_str(&sym.to_string())
96        } else {
97            serializer.serialize_str(source)
98        }
99    }
100}
101
102#[serde_with::skip_serializing_none]
103#[derive(serde::Serialize)]
104#[serde(tag = "level")]
105enum LogEvent<'a> {
106    Error(Record<'a>),
107    Warn(Record<'a>),
108    Info(Record<'a>),
109    Debug(Record<'a>),
110    Trace(Record<'a>),
111    Panic {
112        #[serde(flatten)]
113        record: Record<'a>,
114        trace: &'a [BacktraceFrame<'a>],
115    },
116}
117
118impl DatabaseLoggerInner {
119    fn open(path: ModuleLogPath) -> io::Result<Self> {
120        let date = path.date();
121        let file = path.open_file(File::options().create(true).append(true))?;
122        Ok(Self { file, date, path })
123    }
124}
125
126impl DatabaseLogger {
127    pub fn open_today(logs_dir: ModuleLogsDir) -> Self {
128        Self::open(logs_dir.today())
129    }
130
131    pub fn open(path: ModuleLogPath) -> Self {
132        let inner = Mutex::new(DatabaseLoggerInner::open(path).unwrap());
133        let (tx, _) = broadcast::channel(64);
134        Self { inner, tx }
135    }
136
137    #[tracing::instrument(level = "trace", name = "DatabaseLogger::size", skip(self), err)]
138    pub fn size(&self) -> io::Result<u64> {
139        Ok(self.inner.lock().file.metadata()?.len())
140    }
141
142    pub fn write(&self, level: LogLevel, &record: &Record<'_>, bt: &dyn BacktraceProvider) {
143        let (trace, frames);
144        let event = match level {
145            LogLevel::Error => LogEvent::Error(record),
146            LogLevel::Warn => LogEvent::Warn(record),
147            LogLevel::Info => LogEvent::Info(record),
148            LogLevel::Debug => LogEvent::Debug(record),
149            LogLevel::Trace => LogEvent::Trace(record),
150            LogLevel::Panic => {
151                trace = bt.capture();
152                frames = trace.frames();
153                LogEvent::Panic { record, trace: &frames }
154            }
155        };
156        let mut buf = serde_json::to_string(&event).unwrap();
157        buf.push('\n');
158        let mut inner = self.inner.lock();
159        let record_date = record.ts.date_naive();
160        if record_date > inner.date {
161            let new_path = inner.path.with_date(record_date);
162            *inner = DatabaseLoggerInner::open(new_path).unwrap();
163        }
164        inner.file.write_all(buf.as_bytes()).unwrap();
165        let _ = self.tx.send(buf.into());
166    }
167
168    pub async fn read_latest(logs_dir: ModuleLogsDir, num_lines: Option<u32>) -> String {
169        // TODO: do we want to logs from across multiple files?
170
171        let Some(num_lines) = num_lines else {
172            let path = logs_dir.today();
173            // look for the most recent logfile.
174            match tokio::fs::read_to_string(&path).await {
175                Ok(contents) => return contents,
176                Err(e) if e.kind() == io::ErrorKind::NotFound => {}
177                Err(e) => panic!("couldn't read log file: {e}"),
178            }
179            // if there's none for today, read the directory and
180            let logs_dir = path.popped();
181            return asyncify(move || match logs_dir.most_recent()? {
182                Some(newest_log_file) => std::fs::read_to_string(newest_log_file),
183                None => Ok(String::new()),
184            })
185            .await
186            .expect("couldn't read log file");
187        };
188
189        if num_lines == 0 {
190            return String::new();
191        }
192
193        asyncify(move || read_latest_lines(logs_dir, num_lines))
194            .await
195            .expect("couldn't read log file")
196    }
197
198    pub fn system_logger(&self) -> &SystemLogger {
199        // SAFETY: SystemLogger is repr(transparent) over DatabaseLogger
200        unsafe { &*(self as *const DatabaseLogger as *const SystemLogger) }
201    }
202}
203
204fn read_latest_lines(logs_dir: ModuleLogsDir, num_lines: u32) -> io::Result<String> {
205    use std::fs::File;
206    let path = logs_dir.today();
207    let mut file = match File::open(&path) {
208        Ok(f) => f,
209        Err(e) if e.kind() == io::ErrorKind::NotFound => {
210            let Some(path) = path.popped().most_recent()? else {
211                return Ok(String::new());
212            };
213            File::open(path)?
214        }
215        Err(e) => return Err(e),
216    };
217    let mut lines_read: u32 = 0;
218    // rough guess of an appropriate size for a chunk that could get all the lines in one,
219    // assuming a line is roughly 150 bytes long, but capping our buffer size at 64KiB
220    let chunk_size = std::cmp::min((num_lines as u64 * 150).next_power_of_two(), 0x10_000);
221    let mut buf = vec![0; chunk_size as usize];
222    // the file should end in a newline, so we skip that one character
223    let mut pos = file.seek(io::SeekFrom::End(0))?.saturating_sub(1) as usize;
224    'outer: while pos > 0 {
225        let (new_pos, buf) = match pos.checked_sub(buf.len()) {
226            Some(pos) => (pos, &mut buf[..]),
227            None => (0, &mut buf[..pos]),
228        };
229        pos = new_pos;
230        read_exact_at(&file, buf, pos as u64)?;
231        for lf_pos in memchr::Memchr::new(b'\n', buf).rev() {
232            lines_read += 1;
233            if lines_read >= num_lines {
234                pos += lf_pos + 1;
235                break 'outer;
236            }
237        }
238    }
239    file.seek(io::SeekFrom::Start(pos as u64))?;
240    buf.clear();
241    let mut buf = String::from_utf8(buf).unwrap();
242    file.read_to_string(&mut buf)?;
243    Ok(buf)
244}
245
246fn read_exact_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> io::Result<()> {
247    #[cfg(unix)]
248    {
249        use std::os::unix::fs::FileExt;
250        file.read_exact_at(buf, offset)
251    }
252    #[cfg(not(unix))]
253    {
254        (&*file).seek(io::SeekFrom::Start(offset))?;
255        (&*file).read_exact(buf)
256    }
257}
258
259/// Somewhat ad-hoc wrapper around [`DatabaseLogger`] which allows to inject
260/// "system messages" into the user-retrievable database / module log
261#[repr(transparent)]
262pub struct SystemLogger {
263    inner: DatabaseLogger,
264}
265
266impl SystemLogger {
267    pub fn info(&self, msg: &str) {
268        self.inner.write(LogLevel::Info, &Self::record(msg), &())
269    }
270
271    pub fn warn(&self, msg: &str) {
272        self.inner.write(LogLevel::Warn, &Self::record(msg), &())
273    }
274
275    pub fn error(&self, msg: &str) {
276        self.inner.write(LogLevel::Error, &Self::record(msg), &())
277    }
278
279    fn record(message: &str) -> Record {
280        Record {
281            ts: Utc::now(),
282            target: None,
283            filename: Some("spacetimedb"),
284            line_number: None,
285            message,
286        }
287    }
288}