use anyctx::AnyCtx;
use async_channel::{Receiver, Sender};
use chrono::Utc;
use std::io::{self, Write};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use crate::{client::Config, database::DATABASE};
struct DbLogWriter {
tx: Sender<Vec<u8>>,
}
impl Write for DbLogWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let _ = self.tx.try_send(buf.to_vec());
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
async fn ensure_logs_table(ctx: &AnyCtx<Config>) -> Result<(), sqlx::Error> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
json TEXT NOT NULL
);",
)
.execute(ctx.get(DATABASE))
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS logs_ts_idx ON logs(ts);")
.execute(ctx.get(DATABASE))
.await?;
let cutoff = Utc::now().timestamp() - 24 * 60 * 60; sqlx::query("DELETE FROM logs WHERE ts < ?")
.bind(cutoff)
.execute(ctx.get(DATABASE))
.await?;
Ok(())
}
fn spawn_log_consumer(ctx: AnyCtx<Config>, rx: Receiver<Vec<u8>>) {
smolscale::spawn(async move {
if ensure_logs_table(&ctx).await.is_err() {
return;
}
let mut carry: Vec<u8> = Vec::new();
while let Ok(mut chunk) = rx.recv().await {
if !carry.is_empty() {
carry.extend_from_slice(&chunk);
chunk = std::mem::take(&mut carry);
}
let mut start = 0usize;
for i in 0..chunk.len() {
if chunk[i] == b'\n' {
let line = &chunk[start..i];
if !line.is_empty() {
let _ = sqlx::query("INSERT INTO logs (ts, json) VALUES (?, ?)")
.bind(Utc::now().timestamp())
.bind(String::from_utf8_lossy(line).to_string())
.execute(ctx.get(DATABASE))
.await;
}
start = i + 1;
}
}
if start < chunk.len() {
carry.extend_from_slice(&chunk[start..]);
}
}
})
.detach();
}
pub fn init_logging(ctx: &AnyCtx<Config>) -> anyhow::Result<()> {
let (tx, rx) = async_channel::unbounded::<Vec<u8>>();
spawn_log_consumer(ctx.clone(), rx);
tracing_subscriber::registry()
.with(fmt::layer().compact().with_writer(std::io::stderr))
.with(
fmt::layer()
.json()
.with_writer(move || DbLogWriter { tx: tx.clone() }),
)
.with(
EnvFilter::builder()
.with_default_directive("geph=debug".parse()?)
.from_env_lossy(),
)
.try_init()?;
Ok(())
}
pub async fn get_json_logs(ctx: &AnyCtx<Config>) -> String {
match sqlx::query_scalar::<_, String>("SELECT json FROM logs ORDER BY id ASC")
.fetch_all(ctx.get(DATABASE))
.await
{
Ok(lines) => lines.join("\n"),
Err(_) => String::new(),
}
}