Skip to main content

langfuse/scoring/
manager.rs

1//! Score creation and batched submission to the Langfuse API.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use langfuse_core::config::LangfuseConfig;
7use langfuse_core::error::LangfuseError;
8use langfuse_core::types::{ScoreBody, ScoreValue};
9
10use crate::http::{build_http_client, retry_request};
11use crate::scoring::queue::BatchQueue;
12
13/// Manages score creation and batched submission to the Langfuse API.
14///
15/// Scores are buffered in a [`BatchQueue`] and flushed to the ingestion
16/// endpoint either when the buffer reaches `flush_at` or on explicit
17/// [`flush`](ScoreManager::flush) / [`shutdown`](ScoreManager::shutdown).
18///
19/// A background task periodically flushes buffered scores at the configured
20/// `flush_interval`.
21pub struct ScoreManager {
22    config: LangfuseConfig,
23    http_client: reqwest::Client,
24    queue: Arc<BatchQueue>,
25    flush_at: usize,
26    cancelled: Arc<AtomicBool>,
27    flush_handle: Option<tokio::task::JoinHandle<()>>,
28}
29
30impl std::fmt::Debug for ScoreManager {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("ScoreManager")
33            .field("flush_at", &self.flush_at)
34            .field("pending_count", &self.queue.len())
35            .finish()
36    }
37}
38
39impl ScoreManager {
40    /// Create a new `ScoreManager` from the given configuration.
41    ///
42    /// Spawns a background task that auto-flushes buffered scores at the
43    /// configured `flush_interval`.
44    pub fn new(config: &LangfuseConfig) -> Self {
45        let http_client = build_http_client(config);
46        let queue = Arc::new(BatchQueue::new(100_000));
47        let cancelled = Arc::new(AtomicBool::new(false));
48
49        let flush_config = config.clone();
50        let flush_client = http_client.clone();
51        let flush_queue = Arc::clone(&queue);
52        let flush_cancelled = Arc::clone(&cancelled);
53        let flush_interval = config.flush_interval;
54
55        let flush_handle = tokio::spawn(async move {
56            loop {
57                tokio::time::sleep(flush_interval).await;
58                if flush_cancelled.load(Ordering::Relaxed) {
59                    break;
60                }
61                let _ = Self::flush_inner(&flush_config, &flush_client, &flush_queue).await;
62            }
63        });
64
65        Self {
66            config: config.clone(),
67            http_client,
68            queue,
69            flush_at: config.flush_at,
70            cancelled,
71            flush_handle: Some(flush_handle),
72        }
73    }
74
75    /// Create a score and add it to the batch queue.
76    ///
77    /// If the queue reaches `flush_at`, a flush would be triggered
78    /// (currently buffered for explicit flush).
79    pub fn score(&self, body: ScoreBody) {
80        let should_flush = self.queue.push(body);
81        if should_flush || self.queue.len() >= self.flush_at {
82            // Auto-flush background task handles periodic flushing.
83        }
84    }
85
86    /// Score a specific observation within a trace.
87    pub fn score_observation(
88        &self,
89        trace_id: &str,
90        observation_id: &str,
91        name: &str,
92        value: ScoreValue,
93    ) {
94        self.score(ScoreBody {
95            name: name.to_string(),
96            value,
97            trace_id: Some(trace_id.to_string()),
98            observation_id: Some(observation_id.to_string()),
99            comment: None,
100            metadata: None,
101            config_id: None,
102            data_type: None,
103        });
104    }
105
106    /// Score a specific observation with a full [`ScoreBody`].
107    ///
108    /// This is the rich variant of [`score_observation`](ScoreManager::score_observation)
109    /// that accepts a pre-built body with optional comment, metadata, etc.
110    pub fn score_observation_with(&self, body: ScoreBody) {
111        self.score(body);
112    }
113
114    /// Score a trace.
115    pub fn score_trace(&self, trace_id: &str, name: &str, value: ScoreValue) {
116        self.score(ScoreBody {
117            name: name.to_string(),
118            value,
119            trace_id: Some(trace_id.to_string()),
120            observation_id: None,
121            comment: None,
122            metadata: None,
123            config_id: None,
124            data_type: None,
125        });
126    }
127
128    /// Flush all buffered scores to the Langfuse ingestion API.
129    ///
130    /// Drains the buffer and POSTs a batch of `score-create` events.
131    /// Returns `Ok(())` immediately if the buffer is empty.
132    pub async fn flush(&self) -> Result<(), LangfuseError> {
133        Self::flush_inner(&self.config, &self.http_client, &self.queue).await
134    }
135
136    /// Internal flush implementation shared between the public method and
137    /// the background auto-flush task.
138    async fn flush_inner(
139        config: &LangfuseConfig,
140        http_client: &reqwest::Client,
141        queue: &BatchQueue,
142    ) -> Result<(), LangfuseError> {
143        let scores = queue.drain();
144        if scores.is_empty() {
145            return Ok(());
146        }
147
148        let url = format!("{}/ingestion", config.api_base_url());
149        let batch_body = serde_json::json!({
150            "batch": scores.iter().map(|s| {
151                serde_json::json!({
152                    "id": uuid::Uuid::new_v4().to_string(),
153                    "type": "score-create",
154                    "timestamp": chrono::Utc::now().to_rfc3339(),
155                    "body": s,
156                })
157            }).collect::<Vec<_>>()
158        });
159
160        let max_retries = config.max_retries;
161        let client = http_client.clone();
162        let auth = config.basic_auth_header();
163        let url_clone = url.clone();
164        let body_clone = batch_body.clone();
165
166        retry_request(max_retries, || {
167            let client = client.clone();
168            let auth = auth.clone();
169            let url = url_clone.clone();
170            let body = body_clone.clone();
171            async move {
172                let resp = client
173                    .post(&url)
174                    .header("Authorization", &auth)
175                    .header("Content-Type", "application/json")
176                    .json(&body)
177                    .send()
178                    .await
179                    .map_err(LangfuseError::Network)?;
180
181                if !resp.status().is_success() {
182                    let status = resp.status().as_u16();
183                    let message = resp.text().await.unwrap_or_default();
184                    return Err(LangfuseError::Api { status, message });
185                }
186
187                Ok(())
188            }
189        })
190        .await
191    }
192
193    /// Flush all buffered scores and shut down the background task.
194    pub async fn shutdown(&self) -> Result<(), LangfuseError> {
195        self.cancelled.store(true, Ordering::Relaxed);
196        self.flush().await
197    }
198
199    /// Number of scores currently buffered.
200    pub fn pending_count(&self) -> usize {
201        self.queue.len()
202    }
203}
204
205impl Drop for ScoreManager {
206    fn drop(&mut self) {
207        self.cancelled.store(true, Ordering::Relaxed);
208        if let Some(handle) = self.flush_handle.take() {
209            handle.abort();
210        }
211    }
212}