chronicle_proxy/database/
logging.rs1use std::{borrow::Cow, time::Duration};
3
4use chrono::Utc;
5use smallvec::SmallVec;
6use tracing::instrument;
7use uuid::Uuid;
8
9use super::{Database, ProxyDatabase};
10use crate::{
11 format::{ChatRequest, ResponseInfo, SingleChatResponse},
12 workflow_events::{EventPayload, WorkflowEvent},
13 ProxyRequestOptions,
14};
15
16#[derive(Debug)]
18pub struct ProxyLogEvent {
19 pub id: Uuid,
21 pub event_type: Cow<'static, str>,
23 pub timestamp: chrono::DateTime<Utc>,
25 pub request: Option<ChatRequest>,
27 pub response: Option<CollectedProxiedResult>,
29 pub latency: Option<Duration>,
31 pub total_latency: Option<Duration>,
33 pub was_rate_limited: Option<bool>,
35 pub num_retries: Option<u32>,
37 pub error: Option<serde_json::Value>,
39 pub options: ProxyRequestOptions,
41}
42
43impl ProxyLogEvent {
44 pub fn from_payload(id: Uuid, payload: EventPayload) -> Self {
46 let extra = match payload.data {
47 Some(serde_json::Value::Object(m)) => Some(m),
48 _ => None,
49 };
50
51 ProxyLogEvent {
52 id,
53 event_type: Cow::Owned(payload.typ),
54 timestamp: payload.time.unwrap_or_else(|| Utc::now()),
55 request: None,
56 response: None,
57 total_latency: None,
58 latency: None,
59 was_rate_limited: None,
60 num_retries: None,
61 error: payload.error,
62 options: ProxyRequestOptions {
63 metadata: crate::ProxyRequestMetadata {
64 extra,
65 step_id: Some(payload.step_id),
66 run_id: Some(payload.run_id),
67 ..Default::default()
68 },
69 internal_metadata: payload.internal_metadata.unwrap_or_default(),
70 ..Default::default()
71 },
72 }
73 }
74}
75
76#[derive(Debug)]
78pub struct CollectedProxiedResult {
79 pub body: SingleChatResponse,
81 pub info: ResponseInfo,
83 pub provider: String,
85}
86
87#[derive(Debug)]
89pub enum ProxyLogEntry {
90 Proxied(Box<ProxyLogEvent>),
92 Workflow(WorkflowEvent),
94}
95
96pub type LogSender = flume::Sender<SmallVec<[ProxyLogEntry; 1]>>;
98
99pub fn start_database_logger(
101 db: Database,
102 batch_size: usize,
103 debounce_time: Duration,
104) -> (LogSender, tokio::task::JoinHandle<()>) {
105 let (log_tx, log_rx) = flume::unbounded();
106
107 let task = tokio::task::spawn(database_logger_task(db, log_rx, batch_size, debounce_time));
108
109 (log_tx, task)
110}
111
112async fn database_logger_task(
113 db: Database,
114 rx: flume::Receiver<SmallVec<[ProxyLogEntry; 1]>>,
115 batch_size: usize,
116 debounce_time: Duration,
117) {
118 let mut batch = Vec::with_capacity(batch_size);
119
120 loop {
121 tokio::select! {
122 item = rx.recv_async() => {
123 let Ok(item) = item else {
124 break;
126 };
127
128 tracing::debug!(num_items=item.len(), "Received items");
129 batch.extend(item);
130
131 if batch.len() >= batch_size {
132 let send_batch = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
133 write_batch(db.as_ref(), send_batch).await;
134 }
135
136 }
137 _ = tokio::time::sleep(debounce_time), if !batch.is_empty() => {
138 let send_batch = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
139 write_batch(db.as_ref(), send_batch).await;
140 }
141 }
142 }
143 tracing::debug!("Closing database logger");
144
145 if !batch.is_empty() {
146 write_batch(db.as_ref(), batch).await;
147 }
148}
149
150pub(super) const EVENT_INSERT_PREFIX: &str =
151 "INSERT INTO chronicle_events
152 (id, event_type, organization_id, project_id, user_id, chat_request, chat_response,
153 error, provider, model, application, environment, request_organization_id, request_project_id,
154 request_user_id, workflow_id, workflow_name, run_id, step_id, step_index,
155 prompt_id, prompt_version,
156 meta, response_meta, retries, rate_limited, request_latency_ms,
157 total_latency_ms, created_at) VALUES\n";
158
159#[instrument(level = "trace", parent=None, skip(db, items), fields(chronicle.db_batch.num_items = items.len()))]
160async fn write_batch(db: &dyn ProxyDatabase, items: Vec<ProxyLogEntry>) {
161 let result = db.write_log_batch(items).await;
162
163 if let Err(e) = result {
164 tracing::error!(error = ?e, "Failed to write logs to database");
165 }
166}