use crate::store::Neighbor;
pub struct QueryStream {
results: Vec<Neighbor>,
position: usize,
batch_size: usize,
}
impl QueryStream {
pub fn new(results: Vec<Neighbor>) -> Self {
Self {
results,
position: 0,
batch_size: 100, }
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<&Neighbor> {
if self.position < self.results.len() {
let result = &self.results[self.position];
self.position += 1;
Some(result)
} else {
None
}
}
pub fn peek(&self) -> Option<&Neighbor> {
if self.position < self.results.len() {
Some(&self.results[self.position])
} else {
None
}
}
pub fn remaining(&self) -> usize {
self.results.len().saturating_sub(self.position)
}
pub fn is_empty(&self) -> bool {
self.position >= self.results.len()
}
pub fn reset(&mut self) {
self.position = 0;
}
pub fn collect(mut self) -> Vec<Neighbor> {
self.results.split_off(self.position)
}
pub fn skip(&mut self, n: usize) {
self.position = (self.position + n).min(self.results.len());
}
pub fn take(&mut self, n: usize) -> Vec<Neighbor> {
let end = (self.position + n).min(self.results.len());
let taken = self.results[self.position..end].to_vec();
self.position = end;
taken
}
}
pub struct QueryCursor {
offset: usize,
limit: usize,
total_count: usize,
}
impl QueryCursor {
pub fn new(limit: usize) -> Self {
Self {
offset: 0,
limit,
total_count: 0,
}
}
pub fn page(&self) -> usize {
if self.limit == 0 {
0
} else {
self.offset / self.limit
}
}
pub fn next_page(&mut self) {
self.offset += self.limit;
}
pub fn prev_page(&mut self) {
self.offset = self.offset.saturating_sub(self.limit);
}
pub fn goto_page(&mut self, page: usize) {
self.offset = page * self.limit;
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn limit(&self) -> usize {
self.limit
}
pub fn has_next(&self) -> bool {
self.offset + self.limit < self.total_count
}
pub fn has_prev(&self) -> bool {
self.offset > 0
}
pub fn set_total_count(&mut self, count: usize) {
self.total_count = count;
}
pub fn total_pages(&self) -> usize {
if self.limit == 0 {
0
} else {
self.total_count.div_ceil(self.limit)
}
}
}
pub struct BatchedStream {
results: Vec<Neighbor>,
batch_size: usize,
position: usize,
}
impl BatchedStream {
pub fn new(results: Vec<Neighbor>, batch_size: usize) -> Self {
Self {
results,
batch_size,
position: 0,
}
}
pub fn next_batch(&mut self) -> Option<&[Neighbor]> {
if self.position >= self.results.len() {
return None;
}
let end = (self.position + self.batch_size).min(self.results.len());
let batch = &self.results[self.position..end];
self.position = end;
Some(batch)
}
pub fn remaining_batches(&self) -> usize {
let remaining = self.results.len().saturating_sub(self.position);
remaining.div_ceil(self.batch_size)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::Metadata;
fn make_neighbor(id: &str, score: f32) -> Neighbor {
Neighbor {
id: id.to_string(),
score,
metadata: Metadata {
fields: std::collections::HashMap::new(),
},
}
}
#[test]
fn test_stream_basic() {
let results = vec![
make_neighbor("doc1", 0.1),
make_neighbor("doc2", 0.2),
make_neighbor("doc3", 0.3),
];
let mut stream = QueryStream::new(results);
assert_eq!(stream.remaining(), 3);
assert!(stream.next().is_some());
assert_eq!(stream.remaining(), 2);
assert!(stream.next().is_some());
assert!(stream.next().is_some());
assert!(stream.next().is_none());
assert!(stream.is_empty());
}
#[test]
fn test_stream_peek() {
let results = vec![make_neighbor("doc1", 0.1)];
let mut stream = QueryStream::new(results);
assert!(stream.peek().is_some());
assert_eq!(stream.remaining(), 1);
assert!(stream.next().is_some());
assert_eq!(stream.remaining(), 0);
}
#[test]
fn test_stream_skip() {
let results = vec![
make_neighbor("doc1", 0.1),
make_neighbor("doc2", 0.2),
make_neighbor("doc3", 0.3),
];
let mut stream = QueryStream::new(results);
stream.skip(2);
assert_eq!(stream.remaining(), 1);
assert_eq!(stream.next().unwrap().id, "doc3");
}
#[test]
fn test_stream_take() {
let results = vec![
make_neighbor("doc1", 0.1),
make_neighbor("doc2", 0.2),
make_neighbor("doc3", 0.3),
];
let mut stream = QueryStream::new(results);
let taken = stream.take(2);
assert_eq!(taken.len(), 2);
assert_eq!(taken[0].id, "doc1");
assert_eq!(taken[1].id, "doc2");
assert_eq!(stream.remaining(), 1);
}
#[test]
fn test_stream_reset() {
let results = vec![make_neighbor("doc1", 0.1)];
let mut stream = QueryStream::new(results);
stream.next();
assert!(stream.is_empty());
stream.reset();
assert!(!stream.is_empty());
assert!(stream.next().is_some());
}
#[test]
fn test_cursor_pagination() {
let mut cursor = QueryCursor::new(10);
cursor.set_total_count(100);
assert_eq!(cursor.page(), 0);
assert_eq!(cursor.total_pages(), 10);
assert!(cursor.has_next());
assert!(!cursor.has_prev());
cursor.next_page();
assert_eq!(cursor.page(), 1);
assert!(cursor.has_prev());
cursor.goto_page(5);
assert_eq!(cursor.page(), 5);
assert_eq!(cursor.offset(), 50);
}
#[test]
fn test_batched_stream() {
let results = vec![
make_neighbor("doc1", 0.1),
make_neighbor("doc2", 0.2),
make_neighbor("doc3", 0.3),
make_neighbor("doc4", 0.4),
make_neighbor("doc5", 0.5),
];
let mut stream = BatchedStream::new(results, 2);
let batch1 = stream.next_batch().unwrap();
assert_eq!(batch1.len(), 2);
assert_eq!(batch1[0].id, "doc1");
let batch2 = stream.next_batch().unwrap();
assert_eq!(batch2.len(), 2);
let batch3 = stream.next_batch().unwrap();
assert_eq!(batch3.len(), 1);
assert!(stream.next_batch().is_none());
}
#[test]
fn test_batched_remaining() {
let results = vec![
make_neighbor("doc1", 0.1),
make_neighbor("doc2", 0.2),
make_neighbor("doc3", 0.3),
];
let mut stream = BatchedStream::new(results, 2);
assert_eq!(stream.remaining_batches(), 2);
stream.next_batch();
assert_eq!(stream.remaining_batches(), 1);
stream.next_batch();
assert_eq!(stream.remaining_batches(), 0);
}
}