Skip to main content

geph5_client/
logging.rs

1use anyctx::AnyCtx;
2use async_channel::{Receiver, Sender};
3use chrono::Utc;
4use std::io::{self, Write};
5#[cfg(target_os = "ios")]
6use tracing_oslog::OsLogger;
7use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
8
9use crate::{client::Config, database::DATABASE};
10
11struct DbLogWriter {
12    tx: Sender<Vec<u8>>,
13}
14
15impl Write for DbLogWriter {
16    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
17        // Non-blocking: if the channel is full or closed, drop the log
18        let _ = self.tx.try_send(buf.to_vec());
19        Ok(buf.len())
20    }
21
22    fn flush(&mut self) -> io::Result<()> {
23        Ok(())
24    }
25}
26
27async fn ensure_logs_table(ctx: &AnyCtx<Config>) -> Result<(), sqlx::Error> {
28    // Ensure table exists
29    sqlx::query(
30        "CREATE TABLE IF NOT EXISTS logs (
31            id INTEGER PRIMARY KEY AUTOINCREMENT,
32            ts INTEGER NOT NULL,
33            json TEXT NOT NULL
34        );",
35    )
36    .execute(ctx.get(DATABASE))
37    .await?;
38
39    // Ensure an index on timestamp for faster range deletes/queries
40    sqlx::query("CREATE INDEX IF NOT EXISTS logs_ts_idx ON logs(ts);")
41        .execute(ctx.get(DATABASE))
42        .await?;
43
44    // Delete logs older than 24 hours (run once per startup)
45    let cutoff = Utc::now().timestamp() - 24 * 60 * 60; // seconds
46    sqlx::query("DELETE FROM logs WHERE ts < ?")
47        .bind(cutoff)
48        .execute(ctx.get(DATABASE))
49        .await?;
50
51    Ok(())
52}
53
54fn spawn_log_consumer(ctx: AnyCtx<Config>, rx: Receiver<Vec<u8>>) {
55    smolscale::spawn(async move {
56        if ensure_logs_table(&ctx).await.is_err() {
57            // If we can't ensure the table, abort consumer silently
58            return;
59        }
60
61        // Accumulate partial lines between channel messages
62        let mut carry: Vec<u8> = Vec::new();
63        while let Ok(mut chunk) = rx.recv().await {
64            if !carry.is_empty() {
65                carry.extend_from_slice(&chunk);
66                chunk = std::mem::take(&mut carry);
67            }
68
69            let mut start = 0usize;
70            for i in 0..chunk.len() {
71                if chunk[i] == b'\n' {
72                    let line = &chunk[start..i];
73                    if !line.is_empty() {
74                        let _ = sqlx::query("INSERT INTO logs (ts, json) VALUES (?, ?)")
75                            .bind(Utc::now().timestamp())
76                            .bind(String::from_utf8_lossy(line).to_string())
77                            .execute(ctx.get(DATABASE))
78                            .await;
79                    }
80                    start = i + 1;
81                }
82            }
83            if start < chunk.len() {
84                carry.extend_from_slice(&chunk[start..]);
85            }
86        }
87    })
88    .detach();
89}
90
91/// Initialize the tracing subscribers for logging, storing JSON logs in SQLite
92pub fn init_logging(ctx: &AnyCtx<Config>) -> anyhow::Result<()> {
93    // Channel and consumer that writes logs into DB
94    let (tx, rx) = async_channel::unbounded::<Vec<u8>>();
95    spawn_log_consumer(ctx.clone(), rx);
96
97    #[cfg(not(target_os = "ios"))]
98    tracing_subscriber::registry()
99        // Standard logs to stderr (for console display)
100        .with(fmt::layer().compact().with_writer(std::io::stderr))
101        // JSON logs persisted via DB writer
102        .with(
103            fmt::layer()
104                .json()
105                .with_writer(move || DbLogWriter { tx: tx.clone() }),
106        )
107        // Set filtering based on environment or defaults
108        .with(
109            EnvFilter::builder()
110                .with_default_directive("geph=debug".parse()?)
111                .from_env_lossy(),
112        )
113        .try_init()?;
114
115    #[cfg(target_os = "ios")]
116    tracing_subscriber::registry()
117        // Standard logs to stderr (for console display)
118        .with(OsLogger::new("geph.io.daemon", "default"))
119        // JSON logs persisted via DB writer
120        .with(
121            fmt::layer()
122                .json()
123                .with_writer(move || DbLogWriter { tx: tx.clone() }),
124        )
125        // Set filtering based on environment or defaults
126        .with(
127            EnvFilter::builder()
128                .with_default_directive("geph=debug".parse()?)
129                .from_env_lossy(),
130        )
131        .try_init()?;
132
133    Ok(())
134}
135
136/// Get the current JSON logs as a String by reading from DB
137pub async fn get_json_logs(ctx: &AnyCtx<Config>) -> String {
138    match sqlx::query_scalar::<_, String>("SELECT json FROM logs ORDER BY id ASC")
139        .fetch_all(ctx.get(DATABASE))
140        .await
141    {
142        Ok(lines) => lines.join("\n"),
143        Err(_) => String::new(),
144    }
145}