1use anyctx::AnyCtx;
2use async_channel::{Receiver, Sender};
3use chrono::Utc;
4use std::io::{self, Write};
5#[cfg(target_os = "ios")]
6use tracing_oslog::OsLogger;
7use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
8
9use crate::{client::Config, database::DATABASE};
10
11struct DbLogWriter {
12 tx: Sender<Vec<u8>>,
13}
14
15impl Write for DbLogWriter {
16 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
17 let _ = self.tx.try_send(buf.to_vec());
19 Ok(buf.len())
20 }
21
22 fn flush(&mut self) -> io::Result<()> {
23 Ok(())
24 }
25}
26
27async fn ensure_logs_table(ctx: &AnyCtx<Config>) -> Result<(), sqlx::Error> {
28 sqlx::query(
30 "CREATE TABLE IF NOT EXISTS logs (
31 id INTEGER PRIMARY KEY AUTOINCREMENT,
32 ts INTEGER NOT NULL,
33 json TEXT NOT NULL
34 );",
35 )
36 .execute(ctx.get(DATABASE))
37 .await?;
38
39 sqlx::query("CREATE INDEX IF NOT EXISTS logs_ts_idx ON logs(ts);")
41 .execute(ctx.get(DATABASE))
42 .await?;
43
44 let cutoff = Utc::now().timestamp() - 24 * 60 * 60; sqlx::query("DELETE FROM logs WHERE ts < ?")
47 .bind(cutoff)
48 .execute(ctx.get(DATABASE))
49 .await?;
50
51 Ok(())
52}
53
54fn spawn_log_consumer(ctx: AnyCtx<Config>, rx: Receiver<Vec<u8>>) {
55 smolscale::spawn(async move {
56 if ensure_logs_table(&ctx).await.is_err() {
57 return;
59 }
60
61 let mut carry: Vec<u8> = Vec::new();
63 while let Ok(mut chunk) = rx.recv().await {
64 if !carry.is_empty() {
65 carry.extend_from_slice(&chunk);
66 chunk = std::mem::take(&mut carry);
67 }
68
69 let mut start = 0usize;
70 for i in 0..chunk.len() {
71 if chunk[i] == b'\n' {
72 let line = &chunk[start..i];
73 if !line.is_empty() {
74 let _ = sqlx::query("INSERT INTO logs (ts, json) VALUES (?, ?)")
75 .bind(Utc::now().timestamp())
76 .bind(String::from_utf8_lossy(line).to_string())
77 .execute(ctx.get(DATABASE))
78 .await;
79 }
80 start = i + 1;
81 }
82 }
83 if start < chunk.len() {
84 carry.extend_from_slice(&chunk[start..]);
85 }
86 }
87 })
88 .detach();
89}
90
91pub fn init_logging(ctx: &AnyCtx<Config>) -> anyhow::Result<()> {
93 let (tx, rx) = async_channel::unbounded::<Vec<u8>>();
95 spawn_log_consumer(ctx.clone(), rx);
96
97 #[cfg(not(target_os = "ios"))]
98 tracing_subscriber::registry()
99 .with(fmt::layer().compact().with_writer(std::io::stderr))
101 .with(
103 fmt::layer()
104 .json()
105 .with_writer(move || DbLogWriter { tx: tx.clone() }),
106 )
107 .with(
109 EnvFilter::builder()
110 .with_default_directive("geph=debug".parse()?)
111 .from_env_lossy(),
112 )
113 .try_init()?;
114
115 #[cfg(target_os = "ios")]
116 tracing_subscriber::registry()
117 .with(OsLogger::new("geph.io.daemon", "default"))
119 .with(
121 fmt::layer()
122 .json()
123 .with_writer(move || DbLogWriter { tx: tx.clone() }),
124 )
125 .with(
127 EnvFilter::builder()
128 .with_default_directive("geph=debug".parse()?)
129 .from_env_lossy(),
130 )
131 .try_init()?;
132
133 Ok(())
134}
135
136pub async fn get_json_logs(ctx: &AnyCtx<Config>) -> String {
138 match sqlx::query_scalar::<_, String>("SELECT json FROM logs ORDER BY id ASC")
139 .fetch_all(ctx.get(DATABASE))
140 .await
141 {
142 Ok(lines) => lines.join("\n"),
143 Err(_) => String::new(),
144 }
145}