Skip to main content

langfuse/scoring/
queue.rs

1//! Thread-safe batch queue for score events.
2
3use std::sync::{Arc, Mutex};
4
5use langfuse_core::types::ScoreBody;
6
7/// A thread-safe batch queue for score events.
8///
9/// Buffers [`ScoreBody`] items until the queue reaches `max_size`,
10/// at which point [`push`](BatchQueue::push) signals that a flush is needed.
11pub struct BatchQueue {
12    buffer: Arc<Mutex<Vec<ScoreBody>>>,
13    max_size: usize,
14}
15
16impl BatchQueue {
17    /// Create a new `BatchQueue` with the given maximum buffer size.
18    pub fn new(max_size: usize) -> Self {
19        Self {
20            buffer: Arc::new(Mutex::new(Vec::new())),
21            max_size,
22        }
23    }
24
25    /// Add a score to the queue.
26    ///
27    /// Returns `true` if the queue has reached `max_size` (flush needed).
28    pub fn push(&self, score: ScoreBody) -> bool {
29        let mut buf = self.buffer.lock().expect("BatchQueue lock poisoned");
30        buf.push(score);
31        buf.len() >= self.max_size
32    }
33
34    /// Drain all buffered scores, returning them and leaving the buffer empty.
35    pub fn drain(&self) -> Vec<ScoreBody> {
36        let mut buf = self.buffer.lock().expect("BatchQueue lock poisoned");
37        std::mem::take(&mut *buf)
38    }
39
40    /// Number of buffered scores.
41    pub fn len(&self) -> usize {
42        self.buffer.lock().expect("BatchQueue lock poisoned").len()
43    }
44
45    /// Returns `true` if the buffer is empty.
46    pub fn is_empty(&self) -> bool {
47        self.buffer
48            .lock()
49            .expect("BatchQueue lock poisoned")
50            .is_empty()
51    }
52}