langfuse/scoring/
manager.rs1use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use langfuse_core::config::LangfuseConfig;
7use langfuse_core::error::LangfuseError;
8use langfuse_core::types::{ScoreBody, ScoreValue};
9
10use crate::http::{build_http_client, retry_request};
11use crate::scoring::queue::BatchQueue;
12
13pub struct ScoreManager {
22 config: LangfuseConfig,
23 http_client: reqwest::Client,
24 queue: Arc<BatchQueue>,
25 flush_at: usize,
26 cancelled: Arc<AtomicBool>,
27 flush_handle: Option<tokio::task::JoinHandle<()>>,
28}
29
30impl std::fmt::Debug for ScoreManager {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("ScoreManager")
33 .field("flush_at", &self.flush_at)
34 .field("pending_count", &self.queue.len())
35 .finish()
36 }
37}
38
39impl ScoreManager {
40 pub fn new(config: &LangfuseConfig) -> Self {
45 let http_client = build_http_client(config);
46 let queue = Arc::new(BatchQueue::new(100_000));
47 let cancelled = Arc::new(AtomicBool::new(false));
48
49 let flush_config = config.clone();
50 let flush_client = http_client.clone();
51 let flush_queue = Arc::clone(&queue);
52 let flush_cancelled = Arc::clone(&cancelled);
53 let flush_interval = config.flush_interval;
54
55 let flush_handle = tokio::spawn(async move {
56 loop {
57 tokio::time::sleep(flush_interval).await;
58 if flush_cancelled.load(Ordering::Relaxed) {
59 break;
60 }
61 let _ = Self::flush_inner(&flush_config, &flush_client, &flush_queue).await;
62 }
63 });
64
65 Self {
66 config: config.clone(),
67 http_client,
68 queue,
69 flush_at: config.flush_at,
70 cancelled,
71 flush_handle: Some(flush_handle),
72 }
73 }
74
75 pub fn score(&self, body: ScoreBody) {
80 let should_flush = self.queue.push(body);
81 if should_flush || self.queue.len() >= self.flush_at {
82 }
84 }
85
86 pub fn score_observation(
88 &self,
89 trace_id: &str,
90 observation_id: &str,
91 name: &str,
92 value: ScoreValue,
93 ) {
94 self.score(ScoreBody {
95 name: name.to_string(),
96 value,
97 trace_id: Some(trace_id.to_string()),
98 observation_id: Some(observation_id.to_string()),
99 comment: None,
100 metadata: None,
101 config_id: None,
102 data_type: None,
103 });
104 }
105
106 pub fn score_observation_with(&self, body: ScoreBody) {
111 self.score(body);
112 }
113
114 pub fn score_trace(&self, trace_id: &str, name: &str, value: ScoreValue) {
116 self.score(ScoreBody {
117 name: name.to_string(),
118 value,
119 trace_id: Some(trace_id.to_string()),
120 observation_id: None,
121 comment: None,
122 metadata: None,
123 config_id: None,
124 data_type: None,
125 });
126 }
127
128 pub async fn flush(&self) -> Result<(), LangfuseError> {
133 Self::flush_inner(&self.config, &self.http_client, &self.queue).await
134 }
135
136 async fn flush_inner(
139 config: &LangfuseConfig,
140 http_client: &reqwest::Client,
141 queue: &BatchQueue,
142 ) -> Result<(), LangfuseError> {
143 let scores = queue.drain();
144 if scores.is_empty() {
145 return Ok(());
146 }
147
148 let url = format!("{}/ingestion", config.api_base_url());
149 let batch_body = serde_json::json!({
150 "batch": scores.iter().map(|s| {
151 serde_json::json!({
152 "id": uuid::Uuid::new_v4().to_string(),
153 "type": "score-create",
154 "timestamp": chrono::Utc::now().to_rfc3339(),
155 "body": s,
156 })
157 }).collect::<Vec<_>>()
158 });
159
160 let max_retries = config.max_retries;
161 let client = http_client.clone();
162 let auth = config.basic_auth_header();
163 let url_clone = url.clone();
164 let body_clone = batch_body.clone();
165
166 retry_request(max_retries, || {
167 let client = client.clone();
168 let auth = auth.clone();
169 let url = url_clone.clone();
170 let body = body_clone.clone();
171 async move {
172 let resp = client
173 .post(&url)
174 .header("Authorization", &auth)
175 .header("Content-Type", "application/json")
176 .json(&body)
177 .send()
178 .await
179 .map_err(LangfuseError::Network)?;
180
181 if !resp.status().is_success() {
182 let status = resp.status().as_u16();
183 let message = resp.text().await.unwrap_or_default();
184 return Err(LangfuseError::Api { status, message });
185 }
186
187 Ok(())
188 }
189 })
190 .await
191 }
192
193 pub async fn shutdown(&self) -> Result<(), LangfuseError> {
195 self.cancelled.store(true, Ordering::Relaxed);
196 self.flush().await
197 }
198
199 pub fn pending_count(&self) -> usize {
201 self.queue.len()
202 }
203}
204
205impl Drop for ScoreManager {
206 fn drop(&mut self) {
207 self.cancelled.store(true, Ordering::Relaxed);
208 if let Some(handle) = self.flush_handle.take() {
209 handle.abort();
210 }
211 }
212}