systemprompt_logging/layer/
proxy.rs1use std::io::Write;
11use std::sync::{Arc, OnceLock};
12
13use chrono::Utc;
14use tracing::{Event, Subscriber};
15use tracing_subscriber::Layer;
16use tracing_subscriber::layer::Context;
17use tracing_subscriber::registry::LookupSpan;
18
19use super::DatabaseLayer;
20use super::visitor::{FieldVisitor, SpanContext, SpanFields, SpanVisitor, extract_span_context};
21use crate::models::{LogEntry, LogLevel};
22use systemprompt_database::DbPool;
23use systemprompt_identifiers::{ClientId, ContextId, LogId, SessionId, TaskId, TraceId, UserId};
24
25#[derive(Clone)]
26pub struct ProxyDatabaseLayer {
27 inner: Arc<OnceLock<DatabaseLayer>>,
28}
29
30impl std::fmt::Debug for ProxyDatabaseLayer {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("ProxyDatabaseLayer")
33 .field("attached", &self.inner.get().is_some())
34 .finish()
35 }
36}
37
38impl Default for ProxyDatabaseLayer {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl ProxyDatabaseLayer {
45 pub fn new() -> Self {
46 Self {
47 inner: Arc::new(OnceLock::new()),
48 }
49 }
50
51 pub fn attach(&self, db_pool: DbPool) {
52 if self.inner.set(DatabaseLayer::new(db_pool)).is_err() {
53 writeln!(
54 std::io::stderr(),
55 "ProxyDatabaseLayer: database layer already attached, ignoring duplicate"
56 )
57 .ok();
58 }
59 }
60}
61
62impl<S> Layer<S> for ProxyDatabaseLayer
63where
64 S: Subscriber + for<'a> LookupSpan<'a>,
65{
66 fn on_new_span(
67 &self,
68 attrs: &tracing::span::Attributes<'_>,
69 id: &tracing::span::Id,
70 ctx: Context<'_, S>,
71 ) {
72 if let Some(db) = self.inner.get() {
73 db.on_new_span(attrs, id, ctx);
74 } else {
75 record_span_fields(attrs, id, &ctx);
76 }
77 }
78
79 fn on_record(
80 &self,
81 id: &tracing::span::Id,
82 values: &tracing::span::Record<'_>,
83 ctx: Context<'_, S>,
84 ) {
85 if let Some(db) = self.inner.get() {
86 db.on_record(id, values, ctx);
87 } else {
88 update_span_fields(id, values, &ctx);
89 }
90 }
91
92 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
93 if let Some(db) = self.inner.get() {
94 db.on_event(event, ctx);
95 }
96 }
97}
98
99pub(super) fn record_span_fields<S>(
100 attrs: &tracing::span::Attributes<'_>,
101 id: &tracing::span::Id,
102 ctx: &Context<'_, S>,
103) where
104 S: Subscriber + for<'a> LookupSpan<'a>,
105{
106 let Some(span) = ctx.span(id) else {
107 return;
108 };
109 let mut fields = SpanFields::default();
110 let mut context = SpanContext::default();
111 let mut visitor = SpanVisitor {
112 context: &mut context,
113 };
114 attrs.record(&mut visitor);
115
116 fields.user = context.user;
117 fields.session = context.session;
118 fields.task = context.task;
119 fields.trace = context.trace;
120 fields.context = context.context;
121 fields.client = context.client;
122
123 let mut extensions = span.extensions_mut();
124 extensions.insert(fields);
125}
126
127pub(super) fn update_span_fields<S>(
128 id: &tracing::span::Id,
129 values: &tracing::span::Record<'_>,
130 ctx: &Context<'_, S>,
131) where
132 S: Subscriber + for<'a> LookupSpan<'a>,
133{
134 if let Some(span) = ctx.span(id) {
135 let mut extensions = span.extensions_mut();
136 if let Some(fields) = extensions.get_mut::<SpanFields>() {
137 let mut context = SpanContext {
138 user: fields.user.clone(),
139 session: fields.session.clone(),
140 task: fields.task.clone(),
141 trace: fields.trace.clone(),
142 context: fields.context.clone(),
143 client: fields.client.clone(),
144 };
145 let mut visitor = SpanVisitor {
146 context: &mut context,
147 };
148 values.record(&mut visitor);
149
150 fields.user = context.user;
151 fields.session = context.session;
152 fields.task = context.task;
153 fields.trace = context.trace;
154 fields.context = context.context;
155 fields.client = context.client;
156 }
157 }
158}
159
160pub(super) fn build_log_entry<S>(event: &Event<'_>, ctx: &Context<'_, S>) -> Option<LogEntry>
161where
162 S: Subscriber + for<'a> LookupSpan<'a>,
163{
164 let level = *event.metadata().level();
165 let module = event.metadata().target().to_owned();
166
167 let mut visitor = FieldVisitor::default();
168 event.record(&mut visitor);
169
170 let span_context = ctx
171 .current_span()
172 .id()
173 .and_then(|id| ctx.span(id))
174 .map(extract_span_context)?;
175
176 let log_level = match level {
177 tracing::Level::ERROR => LogLevel::Error,
178 tracing::Level::WARN => LogLevel::Warn,
179 tracing::Level::INFO => LogLevel::Info,
180 tracing::Level::DEBUG => LogLevel::Debug,
181 tracing::Level::TRACE => LogLevel::Trace,
182 };
183
184 let user_id = UserId::new(span_context.user.as_ref()?.clone());
185 let session_id = SessionId::new(span_context.session.as_ref()?.clone());
186 let trace_id = TraceId::new(span_context.trace.as_ref()?.clone());
187
188 Some(LogEntry {
189 id: LogId::generate(),
190 timestamp: Utc::now(),
191 level: log_level,
192 module,
193 message: visitor.message,
194 metadata: visitor.fields,
195 user_id,
196 session_id,
197 task_id: span_context.task.as_ref().map(|s| TaskId::new(s.clone())),
198 trace_id,
199 context_id: span_context
200 .context
201 .as_ref()
202 .and_then(|s| ContextId::try_new(s.clone()).ok()),
203 client_id: span_context
204 .client
205 .as_ref()
206 .map(|s| ClientId::new(s.clone())),
207 })
208}