1use chrono::{NaiveDate, Utc};
2use parking_lot::Mutex;
3use std::fs::File;
4use std::io::{self, Read, Seek, Write};
5use tokio::sync::broadcast;
6
7use spacetimedb_paths::server::{ModuleLogPath, ModuleLogsDir};
8
9use crate::util::asyncify;
10
11pub struct DatabaseLogger {
12 inner: Mutex<DatabaseLoggerInner>,
13 pub tx: broadcast::Sender<bytes::Bytes>,
14}
15
16struct DatabaseLoggerInner {
17 file: File,
18 date: NaiveDate,
19 path: ModuleLogPath,
20}
21
22#[derive(Clone, Copy, PartialEq, Eq, Debug, serde::Deserialize)]
23pub enum LogLevel {
24 Error,
25 Warn,
26 Info,
27 Debug,
28 Trace,
29 Panic,
30}
31
32impl From<u8> for LogLevel {
33 fn from(level: u8) -> Self {
34 match level {
35 0 => LogLevel::Error,
36 1 => LogLevel::Warn,
37 2 => LogLevel::Info,
38 3 => LogLevel::Debug,
39 4 => LogLevel::Trace,
40 101 => LogLevel::Panic,
41 _ => LogLevel::Debug,
42 }
43 }
44}
45
46#[serde_with::skip_serializing_none]
47#[serde_with::serde_as]
48#[derive(serde::Serialize, Copy, Clone)]
49pub struct Record<'a> {
50 #[serde_as(as = "serde_with::TimestampMicroSeconds")]
51 pub ts: chrono::DateTime<Utc>,
52 pub target: Option<&'a str>,
53 pub filename: Option<&'a str>,
54 pub line_number: Option<u32>,
55 pub message: &'a str,
56}
57
58pub trait BacktraceProvider {
59 fn capture(&self) -> Box<dyn ModuleBacktrace>;
60}
61
62impl BacktraceProvider for () {
63 fn capture(&self) -> Box<dyn ModuleBacktrace> {
64 Box::new(())
65 }
66}
67
68pub trait ModuleBacktrace {
69 fn frames(&self) -> Vec<BacktraceFrame<'_>>;
70}
71
72impl ModuleBacktrace for () {
73 fn frames(&self) -> Vec<BacktraceFrame<'_>> {
74 vec![]
75 }
76}
77
78#[serde_with::skip_serializing_none]
79#[serde_with::serde_as]
80#[derive(serde::Serialize)]
81pub struct BacktraceFrame<'a> {
82 #[serde_as(as = "Option<DemangleSymbol>")]
83 pub module_name: Option<&'a str>,
84 #[serde_as(as = "Option<DemangleSymbol>")]
85 pub func_name: Option<&'a str>,
86}
87
88struct DemangleSymbol;
89impl serde_with::SerializeAs<&str> for DemangleSymbol {
90 fn serialize_as<S>(source: &&str, serializer: S) -> Result<S::Ok, S::Error>
91 where
92 S: serde::Serializer,
93 {
94 if let Ok(sym) = rustc_demangle::try_demangle(source) {
95 serializer.serialize_str(&sym.to_string())
96 } else {
97 serializer.serialize_str(source)
98 }
99 }
100}
101
102#[serde_with::skip_serializing_none]
103#[derive(serde::Serialize)]
104#[serde(tag = "level")]
105enum LogEvent<'a> {
106 Error(Record<'a>),
107 Warn(Record<'a>),
108 Info(Record<'a>),
109 Debug(Record<'a>),
110 Trace(Record<'a>),
111 Panic {
112 #[serde(flatten)]
113 record: Record<'a>,
114 trace: &'a [BacktraceFrame<'a>],
115 },
116}
117
118impl DatabaseLoggerInner {
119 fn open(path: ModuleLogPath) -> io::Result<Self> {
120 let date = path.date();
121 let file = path.open_file(File::options().create(true).append(true))?;
122 Ok(Self { file, date, path })
123 }
124}
125
126impl DatabaseLogger {
127 pub fn open_today(logs_dir: ModuleLogsDir) -> Self {
128 Self::open(logs_dir.today())
129 }
130
131 pub fn open(path: ModuleLogPath) -> Self {
132 let inner = Mutex::new(DatabaseLoggerInner::open(path).unwrap());
133 let (tx, _) = broadcast::channel(64);
134 Self { inner, tx }
135 }
136
137 #[tracing::instrument(level = "trace", name = "DatabaseLogger::size", skip(self), err)]
138 pub fn size(&self) -> io::Result<u64> {
139 Ok(self.inner.lock().file.metadata()?.len())
140 }
141
142 pub fn write(&self, level: LogLevel, &record: &Record<'_>, bt: &dyn BacktraceProvider) {
143 let (trace, frames);
144 let event = match level {
145 LogLevel::Error => LogEvent::Error(record),
146 LogLevel::Warn => LogEvent::Warn(record),
147 LogLevel::Info => LogEvent::Info(record),
148 LogLevel::Debug => LogEvent::Debug(record),
149 LogLevel::Trace => LogEvent::Trace(record),
150 LogLevel::Panic => {
151 trace = bt.capture();
152 frames = trace.frames();
153 LogEvent::Panic { record, trace: &frames }
154 }
155 };
156 let mut buf = serde_json::to_string(&event).unwrap();
157 buf.push('\n');
158 let mut inner = self.inner.lock();
159 let record_date = record.ts.date_naive();
160 if record_date > inner.date {
161 let new_path = inner.path.with_date(record_date);
162 *inner = DatabaseLoggerInner::open(new_path).unwrap();
163 }
164 inner.file.write_all(buf.as_bytes()).unwrap();
165 let _ = self.tx.send(buf.into());
166 }
167
168 pub async fn read_latest(logs_dir: ModuleLogsDir, num_lines: Option<u32>) -> String {
169 let Some(num_lines) = num_lines else {
172 let path = logs_dir.today();
173 match tokio::fs::read_to_string(&path).await {
175 Ok(contents) => return contents,
176 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
177 Err(e) => panic!("couldn't read log file: {e}"),
178 }
179 let logs_dir = path.popped();
181 return asyncify(move || match logs_dir.most_recent()? {
182 Some(newest_log_file) => std::fs::read_to_string(newest_log_file),
183 None => Ok(String::new()),
184 })
185 .await
186 .expect("couldn't read log file");
187 };
188
189 if num_lines == 0 {
190 return String::new();
191 }
192
193 asyncify(move || read_latest_lines(logs_dir, num_lines))
194 .await
195 .expect("couldn't read log file")
196 }
197
198 pub fn system_logger(&self) -> &SystemLogger {
199 unsafe { &*(self as *const DatabaseLogger as *const SystemLogger) }
201 }
202}
203
204fn read_latest_lines(logs_dir: ModuleLogsDir, num_lines: u32) -> io::Result<String> {
205 use std::fs::File;
206 let path = logs_dir.today();
207 let mut file = match File::open(&path) {
208 Ok(f) => f,
209 Err(e) if e.kind() == io::ErrorKind::NotFound => {
210 let Some(path) = path.popped().most_recent()? else {
211 return Ok(String::new());
212 };
213 File::open(path)?
214 }
215 Err(e) => return Err(e),
216 };
217 let mut lines_read: u32 = 0;
218 let chunk_size = std::cmp::min((num_lines as u64 * 150).next_power_of_two(), 0x10_000);
221 let mut buf = vec![0; chunk_size as usize];
222 let mut pos = file.seek(io::SeekFrom::End(0))?.saturating_sub(1) as usize;
224 'outer: while pos > 0 {
225 let (new_pos, buf) = match pos.checked_sub(buf.len()) {
226 Some(pos) => (pos, &mut buf[..]),
227 None => (0, &mut buf[..pos]),
228 };
229 pos = new_pos;
230 read_exact_at(&file, buf, pos as u64)?;
231 for lf_pos in memchr::Memchr::new(b'\n', buf).rev() {
232 lines_read += 1;
233 if lines_read >= num_lines {
234 pos += lf_pos + 1;
235 break 'outer;
236 }
237 }
238 }
239 file.seek(io::SeekFrom::Start(pos as u64))?;
240 buf.clear();
241 let mut buf = String::from_utf8(buf).unwrap();
242 file.read_to_string(&mut buf)?;
243 Ok(buf)
244}
245
246fn read_exact_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> io::Result<()> {
247 #[cfg(unix)]
248 {
249 use std::os::unix::fs::FileExt;
250 file.read_exact_at(buf, offset)
251 }
252 #[cfg(not(unix))]
253 {
254 (&*file).seek(io::SeekFrom::Start(offset))?;
255 (&*file).read_exact(buf)
256 }
257}
258
259#[repr(transparent)]
262pub struct SystemLogger {
263 inner: DatabaseLogger,
264}
265
266impl SystemLogger {
267 pub fn info(&self, msg: &str) {
268 self.inner.write(LogLevel::Info, &Self::record(msg), &())
269 }
270
271 pub fn warn(&self, msg: &str) {
272 self.inner.write(LogLevel::Warn, &Self::record(msg), &())
273 }
274
275 pub fn error(&self, msg: &str) {
276 self.inner.write(LogLevel::Error, &Self::record(msg), &())
277 }
278
279 fn record(message: &str) -> Record {
280 Record {
281 ts: Utc::now(),
282 target: None,
283 filename: Some("spacetimedb"),
284 line_number: None,
285 message,
286 }
287 }
288}