systemprompt_logging/layer/
mod.rs1mod proxy;
2pub(crate) mod visitor;
3
4use std::io::Write;
5use std::time::Duration;
6
7use tokio::sync::mpsc;
8use tracing::{Event, Subscriber};
9use tracing_subscriber::Layer;
10use tracing_subscriber::layer::Context;
11use tracing_subscriber::registry::LookupSpan;
12
13pub use proxy::ProxyDatabaseLayer;
14use proxy::{build_log_entry, record_span_fields, update_span_fields};
15
16use crate::models::{LogEntry, LogLevel};
17use systemprompt_database::DbPool;
18use systemprompt_identifiers::{ClientId, ContextId, TaskId};
19
20const BUFFER_FLUSH_SIZE: usize = 100;
21const BUFFER_FLUSH_INTERVAL_SECS: u64 = 10;
22
23enum LogCommand {
24 Entry(Box<LogEntry>),
25 FlushNow,
26}
27
28pub struct DatabaseLayer {
29 sender: mpsc::UnboundedSender<LogCommand>,
30}
31
32impl std::fmt::Debug for DatabaseLayer {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("DatabaseLayer").finish_non_exhaustive()
35 }
36}
37
38impl DatabaseLayer {
39 pub fn new(db_pool: DbPool) -> Self {
40 let (sender, receiver) = mpsc::unbounded_channel();
41
42 tokio::spawn(Self::batch_writer(db_pool, receiver));
43
44 Self { sender }
45 }
46
47 async fn batch_writer(db_pool: DbPool, mut receiver: mpsc::UnboundedReceiver<LogCommand>) {
48 let mut buffer = Vec::with_capacity(BUFFER_FLUSH_SIZE);
49 let mut interval = tokio::time::interval(Duration::from_secs(BUFFER_FLUSH_INTERVAL_SECS));
50
51 loop {
52 tokio::select! {
53 Some(command) = receiver.recv() => {
54 match command {
55 LogCommand::Entry(entry) => {
56 buffer.push(*entry);
57 if buffer.len() >= BUFFER_FLUSH_SIZE {
58 Self::flush(&db_pool, &mut buffer).await;
59 }
60 }
61 LogCommand::FlushNow => {
62 if !buffer.is_empty() {
63 Self::flush(&db_pool, &mut buffer).await;
64 }
65 }
66 }
67 }
68 _ = interval.tick() => {
69 if !buffer.is_empty() {
70 Self::flush(&db_pool, &mut buffer).await;
71 }
72 }
73 }
74 }
75 }
76
77 async fn flush(db_pool: &DbPool, buffer: &mut Vec<LogEntry>) {
78 if let Err(e) = Self::batch_insert(db_pool, buffer).await {
79 let _ = writeln!(
80 std::io::stderr(),
81 "DATABASE LOG FLUSH FAILED ({} entries lost): {e}",
82 buffer.len()
83 );
84 }
85 buffer.clear();
86 }
87
88 async fn batch_insert(db_pool: &DbPool, entries: &[LogEntry]) -> anyhow::Result<()> {
89 let pool = db_pool.write_pool_arc()?;
90 for entry in entries {
91 let metadata_json: Option<String> = entry
92 .metadata
93 .as_ref()
94 .map(serde_json::to_string)
95 .transpose()?;
96
97 let entry_id = entry.id.as_str();
98 let level_str = entry.level.to_string();
99 let user_id = entry.user_id.as_str();
100 let session_id = entry.session_id.as_str();
101 let task_id = entry.task_id.as_ref().map(TaskId::as_str);
102 let trace_id = entry.trace_id.as_str();
103 let context_id = entry.context_id.as_ref().map(ContextId::as_str);
104 let client_id = entry.client_id.as_ref().map(ClientId::as_str);
105
106 sqlx::query!(
107 r"
108 INSERT INTO logs (id, level, module, message, metadata, user_id, session_id, task_id, trace_id, context_id, client_id)
109 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
110 ",
111 entry_id,
112 level_str,
113 entry.module,
114 entry.message,
115 metadata_json,
116 user_id,
117 session_id,
118 task_id,
119 trace_id,
120 context_id,
121 client_id
122 )
123 .execute(pool.as_ref())
124 .await?;
125 }
126
127 Ok(())
128 }
129}
130
131impl DatabaseLayer {
132 fn send_entry(&self, entry: LogEntry) {
133 let is_error = entry.level == LogLevel::Error;
134 let _ = self.sender.send(LogCommand::Entry(Box::new(entry)));
135 if is_error {
136 let _ = self.sender.send(LogCommand::FlushNow);
137 }
138 }
139}
140
141impl<S> Layer<S> for DatabaseLayer
142where
143 S: Subscriber + for<'a> LookupSpan<'a>,
144{
145 fn on_new_span(
146 &self,
147 attrs: &tracing::span::Attributes<'_>,
148 id: &tracing::span::Id,
149 ctx: Context<'_, S>,
150 ) {
151 record_span_fields(attrs, id, &ctx);
152 }
153
154 fn on_record(
155 &self,
156 id: &tracing::span::Id,
157 values: &tracing::span::Record<'_>,
158 ctx: Context<'_, S>,
159 ) {
160 update_span_fields(id, values, &ctx);
161 }
162
163 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
164 self.send_entry(build_log_entry(event, &ctx));
165 }
166}