chronicle_proxy/database/
logging.rs

1//! Logging events to the database
2use std::{borrow::Cow, time::Duration};
3
4use chrono::Utc;
5use smallvec::SmallVec;
6use tracing::instrument;
7use uuid::Uuid;
8
9use super::{Database, ProxyDatabase};
10use crate::{
11    format::{ChatRequest, ResponseInfo, SingleChatResponse},
12    workflow_events::{EventPayload, WorkflowEvent},
13    ProxyRequestOptions,
14};
15
16/// An event from the proxy.
17#[derive(Debug)]
18pub struct ProxyLogEvent {
19    /// A unique ID for this event
20    pub id: Uuid,
21    /// The type of event
22    pub event_type: Cow<'static, str>,
23    /// The timestamp of the event
24    pub timestamp: chrono::DateTime<Utc>,
25    /// The request that was proxied
26    pub request: Option<ChatRequest>,
27    /// The response from the model provider
28    pub response: Option<CollectedProxiedResult>,
29    /// The latency of the request that succeeded
30    pub latency: Option<Duration>,
31    /// The total latency of the request, including retries.
32    pub total_latency: Option<Duration>,
33    /// Whether the request was rate limited
34    pub was_rate_limited: Option<bool>,
35    /// The number of retries
36    pub num_retries: Option<u32>,
37    /// The error that occurred, if any.
38    pub error: Option<serde_json::Value>,
39    /// The options that were used for the request
40    pub options: ProxyRequestOptions,
41}
42
43impl ProxyLogEvent {
44    /// Create a new event from a submitted payload
45    pub fn from_payload(id: Uuid, payload: EventPayload) -> Self {
46        let extra = match payload.data {
47            Some(serde_json::Value::Object(m)) => Some(m),
48            _ => None,
49        };
50
51        ProxyLogEvent {
52            id,
53            event_type: Cow::Owned(payload.typ),
54            timestamp: payload.time.unwrap_or_else(|| Utc::now()),
55            request: None,
56            response: None,
57            total_latency: None,
58            latency: None,
59            was_rate_limited: None,
60            num_retries: None,
61            error: payload.error,
62            options: ProxyRequestOptions {
63                metadata: crate::ProxyRequestMetadata {
64                    extra,
65                    step_id: Some(payload.step_id),
66                    run_id: Some(payload.run_id),
67                    ..Default::default()
68                },
69                internal_metadata: payload.internal_metadata.unwrap_or_default(),
70                ..Default::default()
71            },
72        }
73    }
74}
75
76/// A response from the model provider, collected into a single body if it was streamed
77#[derive(Debug)]
78pub struct CollectedProxiedResult {
79    /// The response itself
80    pub body: SingleChatResponse,
81    /// Other information about the response
82    pub info: ResponseInfo,
83    /// The provider which was used for the successful response.
84    pub provider: String,
85}
86
87/// An event to be logged
88#[derive(Debug)]
89pub enum ProxyLogEntry {
90    /// The result of a proxied model request
91    Proxied(Box<ProxyLogEvent>),
92    /// An update from a workflow step or run
93    Workflow(WorkflowEvent),
94}
95
96/// A channel on which log events can be sent.
97pub type LogSender = flume::Sender<SmallVec<[ProxyLogEntry; 1]>>;
98
99/// Start the database logger task
100pub fn start_database_logger(
101    db: Database,
102    batch_size: usize,
103    debounce_time: Duration,
104) -> (LogSender, tokio::task::JoinHandle<()>) {
105    let (log_tx, log_rx) = flume::unbounded();
106
107    let task = tokio::task::spawn(database_logger_task(db, log_rx, batch_size, debounce_time));
108
109    (log_tx, task)
110}
111
112async fn database_logger_task(
113    db: Database,
114    rx: flume::Receiver<SmallVec<[ProxyLogEntry; 1]>>,
115    batch_size: usize,
116    debounce_time: Duration,
117) {
118    let mut batch = Vec::with_capacity(batch_size);
119
120    loop {
121        tokio::select! {
122            item = rx.recv_async() => {
123                let Ok(item) = item else {
124                    // channel closed so we're done
125                    break;
126                };
127
128                tracing::debug!(num_items=item.len(), "Received items");
129                batch.extend(item);
130
131                if batch.len() >= batch_size {
132                    let send_batch = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
133                    write_batch(db.as_ref(), send_batch).await;
134                }
135
136            }
137            _ = tokio::time::sleep(debounce_time), if !batch.is_empty() => {
138                let send_batch = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
139                write_batch(db.as_ref(), send_batch).await;
140            }
141        }
142    }
143    tracing::debug!("Closing database logger");
144
145    if !batch.is_empty() {
146        write_batch(db.as_ref(), batch).await;
147    }
148}
149
150pub(super) const EVENT_INSERT_PREFIX: &str =
151        "INSERT INTO chronicle_events
152        (id, event_type, organization_id, project_id, user_id, chat_request, chat_response,
153         error, provider, model, application, environment, request_organization_id, request_project_id,
154         request_user_id, workflow_id, workflow_name, run_id, step_id, step_index,
155         prompt_id, prompt_version,
156         meta, response_meta, retries, rate_limited, request_latency_ms,
157         total_latency_ms, created_at) VALUES\n";
158
159#[instrument(level = "trace", parent=None, skip(db, items), fields(chronicle.db_batch.num_items = items.len()))]
160async fn write_batch(db: &dyn ProxyDatabase, items: Vec<ProxyLogEntry>) {
161    let result = db.write_log_batch(items).await;
162
163    if let Err(e) = result {
164        tracing::error!(error = ?e, "Failed to write logs to database");
165    }
166}