Skip to main content

systemprompt_logging/layer/
proxy.rs

1//! Deferred database-logging tracing layer.
2//!
3//! [`ProxyDatabaseLayer`] is installed in the subscriber stack before a
4//! database pool exists, buffering span attribution into span extensions. Once
5//! [`ProxyDatabaseLayer::attach`] supplies a pool it delegates to the real
6//! `DatabaseLayer`; until then span fields are recorded so attribution is not
7//! lost across the boot window. The free functions build the [`LogEntry`]
8//! actor triple by walking the span tree.
9
10use std::io::Write;
11use std::sync::{Arc, OnceLock};
12
13use chrono::Utc;
14use tracing::{Event, Subscriber};
15use tracing_subscriber::Layer;
16use tracing_subscriber::layer::Context;
17use tracing_subscriber::registry::LookupSpan;
18
19use super::DatabaseLayer;
20use super::visitor::{FieldVisitor, SpanContext, SpanFields, SpanVisitor, extract_span_context};
21use crate::models::{LogEntry, LogLevel};
22use systemprompt_database::DbPool;
23use systemprompt_identifiers::{ClientId, ContextId, LogId, SessionId, TaskId, TraceId, UserId};
24
25#[derive(Clone)]
26pub struct ProxyDatabaseLayer {
27    inner: Arc<OnceLock<DatabaseLayer>>,
28}
29
30impl std::fmt::Debug for ProxyDatabaseLayer {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("ProxyDatabaseLayer")
33            .field("attached", &self.inner.get().is_some())
34            .finish()
35    }
36}
37
38impl Default for ProxyDatabaseLayer {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl ProxyDatabaseLayer {
45    pub fn new() -> Self {
46        Self {
47            inner: Arc::new(OnceLock::new()),
48        }
49    }
50
51    pub fn attach(&self, db_pool: DbPool) {
52        if self.inner.set(DatabaseLayer::new(db_pool)).is_err() {
53            writeln!(
54                std::io::stderr(),
55                "ProxyDatabaseLayer: database layer already attached, ignoring duplicate"
56            )
57            .ok();
58        }
59    }
60}
61
62impl<S> Layer<S> for ProxyDatabaseLayer
63where
64    S: Subscriber + for<'a> LookupSpan<'a>,
65{
66    fn on_new_span(
67        &self,
68        attrs: &tracing::span::Attributes<'_>,
69        id: &tracing::span::Id,
70        ctx: Context<'_, S>,
71    ) {
72        if let Some(db) = self.inner.get() {
73            db.on_new_span(attrs, id, ctx);
74        } else {
75            record_span_fields(attrs, id, &ctx);
76        }
77    }
78
79    fn on_record(
80        &self,
81        id: &tracing::span::Id,
82        values: &tracing::span::Record<'_>,
83        ctx: Context<'_, S>,
84    ) {
85        if let Some(db) = self.inner.get() {
86            db.on_record(id, values, ctx);
87        } else {
88            update_span_fields(id, values, &ctx);
89        }
90    }
91
92    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
93        if let Some(db) = self.inner.get() {
94            db.on_event(event, ctx);
95        }
96    }
97}
98
99pub(super) fn record_span_fields<S>(
100    attrs: &tracing::span::Attributes<'_>,
101    id: &tracing::span::Id,
102    ctx: &Context<'_, S>,
103) where
104    S: Subscriber + for<'a> LookupSpan<'a>,
105{
106    let Some(span) = ctx.span(id) else {
107        return;
108    };
109    let mut fields = SpanFields::default();
110    let mut context = SpanContext::default();
111    let mut visitor = SpanVisitor {
112        context: &mut context,
113    };
114    attrs.record(&mut visitor);
115
116    fields.user = context.user;
117    fields.session = context.session;
118    fields.task = context.task;
119    fields.trace = context.trace;
120    fields.context = context.context;
121    fields.client = context.client;
122
123    let mut extensions = span.extensions_mut();
124    extensions.insert(fields);
125}
126
127pub(super) fn update_span_fields<S>(
128    id: &tracing::span::Id,
129    values: &tracing::span::Record<'_>,
130    ctx: &Context<'_, S>,
131) where
132    S: Subscriber + for<'a> LookupSpan<'a>,
133{
134    if let Some(span) = ctx.span(id) {
135        let mut extensions = span.extensions_mut();
136        if let Some(fields) = extensions.get_mut::<SpanFields>() {
137            let mut context = SpanContext {
138                user: fields.user.clone(),
139                session: fields.session.clone(),
140                task: fields.task.clone(),
141                trace: fields.trace.clone(),
142                context: fields.context.clone(),
143                client: fields.client.clone(),
144            };
145            let mut visitor = SpanVisitor {
146                context: &mut context,
147            };
148            values.record(&mut visitor);
149
150            fields.user = context.user;
151            fields.session = context.session;
152            fields.task = context.task;
153            fields.trace = context.trace;
154            fields.context = context.context;
155            fields.client = context.client;
156        }
157    }
158}
159
160pub(super) fn build_log_entry<S>(event: &Event<'_>, ctx: &Context<'_, S>) -> Option<LogEntry>
161where
162    S: Subscriber + for<'a> LookupSpan<'a>,
163{
164    let level = *event.metadata().level();
165    let module = event.metadata().target().to_owned();
166
167    let mut visitor = FieldVisitor::default();
168    event.record(&mut visitor);
169
170    let span_context = ctx
171        .current_span()
172        .id()
173        .and_then(|id| ctx.span(id))
174        .map(extract_span_context)?;
175
176    let log_level = match level {
177        tracing::Level::ERROR => LogLevel::Error,
178        tracing::Level::WARN => LogLevel::Warn,
179        tracing::Level::INFO => LogLevel::Info,
180        tracing::Level::DEBUG => LogLevel::Debug,
181        tracing::Level::TRACE => LogLevel::Trace,
182    };
183
184    let user_id = UserId::new(span_context.user.as_ref()?.clone());
185    let session_id = SessionId::new(span_context.session.as_ref()?.clone());
186    let trace_id = TraceId::new(span_context.trace.as_ref()?.clone());
187
188    Some(LogEntry {
189        id: LogId::generate(),
190        timestamp: Utc::now(),
191        level: log_level,
192        module,
193        message: visitor.message,
194        metadata: visitor.fields,
195        user_id,
196        session_id,
197        task_id: span_context.task.as_ref().map(|s| TaskId::new(s.clone())),
198        trace_id,
199        context_id: span_context
200            .context
201            .as_ref()
202            .and_then(|s| ContextId::try_new(s.clone()).ok()),
203        client_id: span_context
204            .client
205            .as_ref()
206            .map(|s| ClientId::new(s.clone())),
207    })
208}