Skip to main content

systemprompt_logging/layer/
mod.rs

1//! `tracing` subscriber layer that persists events to the database.
2//!
3//! [`DatabaseLayer`] buffers log events off the hot path and batch-inserts them
4//! from a background task, flushing on a size threshold, a timer, or
5//! immediately on an error. [`ProxyDatabaseLayer`] is the proxy-side variant.
6
7mod proxy;
8mod visitor;
9
10use std::io::Write;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, OnceLock};
13use std::time::Duration;
14
15use tokio::sync::mpsc;
16use tracing::{Event, Subscriber};
17use tracing_subscriber::Layer;
18use tracing_subscriber::layer::Context;
19use tracing_subscriber::registry::LookupSpan;
20
21pub use proxy::ProxyDatabaseLayer;
22use proxy::{build_log_entry, record_span_fields, update_span_fields};
23
24use crate::models::{LogEntry, LogLevel};
25use systemprompt_database::DbPool;
26use systemprompt_identifiers::{ClientId, ContextId, TaskId};
27
28const BUFFER_FLUSH_SIZE: usize = 100;
29const BUFFER_FLUSH_INTERVAL_SECS: u64 = 10;
30
31/// Bounded capacity of the log channel. Beyond this depth (a sustained burst
32/// the database writer cannot drain) entries are dropped rather than queued, so
33/// a logging backlog cannot grow the heap without bound.
34const CHANNEL_CAPACITY: usize = 8192;
35
36static BACKGROUND_SENDER: OnceLock<mpsc::Sender<LogCommand>> = OnceLock::new();
37static BACKGROUND_DROPPED: AtomicU64 = AtomicU64::new(0);
38
39/// Enqueues a log entry into the background batch writer, off the caller's hot
40/// path.
41///
42/// Non-blocking: dropped (and counted) if the sink is unattached or the channel
43/// is full. Error entries also request an immediate flush.
44pub fn enqueue_background(entry: LogEntry) {
45    let Some(sender) = BACKGROUND_SENDER.get() else {
46        BACKGROUND_DROPPED.fetch_add(1, Ordering::Relaxed);
47        return;
48    };
49    let is_error = entry.level == LogLevel::Error;
50    if sender.try_send(LogCommand::Entry(Box::new(entry))).is_err() {
51        BACKGROUND_DROPPED.fetch_add(1, Ordering::Relaxed);
52        return;
53    }
54    if is_error {
55        sender.try_send(LogCommand::FlushNow).ok();
56    }
57}
58
59enum LogCommand {
60    Entry(Box<LogEntry>),
61    FlushNow,
62}
63
64/// Bounded sender to the database writer task. On a full channel the entry is
65/// dropped and [`LogChannel::dropped`] is incremented; the send never blocks,
66/// so logging stays off the hot path even under burst.
67struct LogChannel {
68    sender: mpsc::Sender<LogCommand>,
69    dropped: Arc<AtomicU64>,
70}
71
72impl LogChannel {
73    fn new(capacity: usize) -> (Self, mpsc::Receiver<LogCommand>) {
74        let (sender, receiver) = mpsc::channel(capacity);
75        let channel = Self {
76            sender,
77            dropped: Arc::new(AtomicU64::new(0)),
78        };
79        (channel, receiver)
80    }
81
82    fn send(&self, command: LogCommand) {
83        if let Err(mpsc::error::TrySendError::Full(_)) = self.sender.try_send(command) {
84            self.dropped.fetch_add(1, Ordering::Relaxed);
85        }
86    }
87
88    fn dropped(&self) -> u64 {
89        self.dropped.load(Ordering::Relaxed)
90    }
91}
92
93pub struct DatabaseLayer {
94    channel: LogChannel,
95}
96
97impl std::fmt::Debug for DatabaseLayer {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("DatabaseLayer")
100            .field("dropped", &self.channel.dropped())
101            .finish_non_exhaustive()
102    }
103}
104
105impl DatabaseLayer {
106    pub fn new(db_pool: DbPool) -> Self {
107        let (channel, receiver) = LogChannel::new(CHANNEL_CAPACITY);
108
109        BACKGROUND_SENDER.get_or_init(|| channel.sender.clone());
110
111        tokio::spawn(Self::batch_writer(db_pool, receiver));
112
113        Self { channel }
114    }
115
116    async fn batch_writer(db_pool: DbPool, mut receiver: mpsc::Receiver<LogCommand>) {
117        let mut buffer = Vec::with_capacity(BUFFER_FLUSH_SIZE);
118        let mut interval = tokio::time::interval(Duration::from_secs(BUFFER_FLUSH_INTERVAL_SECS));
119        let mut failed_total: u64 = 0;
120
121        loop {
122            tokio::select! {
123                Some(command) = receiver.recv() => {
124                    match command {
125                        LogCommand::Entry(entry) => {
126                            buffer.push(*entry);
127                            if buffer.len() >= BUFFER_FLUSH_SIZE {
128                                Self::flush(&db_pool, &mut buffer, &mut failed_total).await;
129                            }
130                        }
131                        LogCommand::FlushNow => {
132                            if !buffer.is_empty() {
133                                Self::flush(&db_pool, &mut buffer, &mut failed_total).await;
134                            }
135                        }
136                    }
137                }
138                _ = interval.tick() => {
139                    if !buffer.is_empty() {
140                        Self::flush(&db_pool, &mut buffer, &mut failed_total).await;
141                    }
142                }
143            }
144        }
145    }
146
147    async fn flush(db_pool: &DbPool, buffer: &mut Vec<LogEntry>, failed_total: &mut u64) {
148        if let Err(e) = Self::batch_insert(db_pool, buffer).await {
149            let lost = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
150            *failed_total = failed_total.saturating_add(lost);
151            writeln!(
152                std::io::stderr(),
153                "DATABASE LOG FLUSH FAILED ({lost} entries lost this flush, {failed_total} total lost since start): {e}"
154            )
155            .ok();
156        }
157        buffer.clear();
158    }
159
160    async fn batch_insert(
161        db_pool: &DbPool,
162        entries: &[LogEntry],
163    ) -> Result<(), crate::models::LoggingError> {
164        let pool = db_pool.write_pool_arc()?;
165
166        // One commit per flush, fsync off: the audit log is best-effort, so a
167        // few buffered rows lost on an unclean shutdown is an acceptable trade.
168        let mut tx = pool.begin().await?;
169        sqlx::query!("SET LOCAL synchronous_commit = off")
170            .execute(&mut *tx)
171            .await?;
172
173        for entry in entries {
174            let metadata_json: Option<String> = entry
175                .metadata
176                .as_ref()
177                .map(serde_json::to_string)
178                .transpose()?;
179
180            let entry_id = entry.id.as_str();
181            let level_str = entry.level.to_string();
182            let user_id = entry.user_id.as_str();
183            let session_id = entry.session_id.as_str();
184            let task_id = entry.task_id.as_ref().map(TaskId::as_str);
185            let trace_id = entry.trace_id.as_str();
186            let context_id = entry.context_id.as_ref().map(ContextId::as_str);
187            let client_id = entry.client_id.as_ref().map(ClientId::as_str);
188
189            sqlx::query!(
190                r"
191                INSERT INTO logs (id, timestamp, level, module, message, metadata, user_id, session_id, task_id, trace_id, context_id, client_id)
192                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
193                ",
194                entry_id,
195                entry.timestamp,
196                level_str,
197                entry.module,
198                entry.message,
199                metadata_json,
200                user_id,
201                session_id,
202                task_id,
203                trace_id,
204                context_id,
205                client_id
206            )
207            .execute(&mut *tx)
208            .await?;
209        }
210
211        tx.commit().await?;
212        Ok(())
213    }
214}
215
216impl DatabaseLayer {
217    fn send_entry(&self, entry: LogEntry) {
218        let is_error = entry.level == LogLevel::Error;
219        self.channel.send(LogCommand::Entry(Box::new(entry)));
220        if is_error {
221            self.channel.send(LogCommand::FlushNow);
222        }
223    }
224}
225
226impl<S> Layer<S> for DatabaseLayer
227where
228    S: Subscriber + for<'a> LookupSpan<'a>,
229{
230    fn on_new_span(
231        &self,
232        attrs: &tracing::span::Attributes<'_>,
233        id: &tracing::span::Id,
234        ctx: Context<'_, S>,
235    ) {
236        record_span_fields(attrs, id, &ctx);
237    }
238
239    fn on_record(
240        &self,
241        id: &tracing::span::Id,
242        values: &tracing::span::Record<'_>,
243        ctx: Context<'_, S>,
244    ) {
245        update_span_fields(id, values, &ctx);
246    }
247
248    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
249        if let Some(entry) = build_log_entry(event, &ctx) {
250            self.send_entry(entry);
251        }
252    }
253}