velesdb_core/collection/streaming/
ingester.rs1use crate::collection::types::Collection;
4use crate::point::Point;
5
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio::sync::Notify;
9
10#[derive(Debug, Clone)]
28pub struct StreamingConfig {
29 pub buffer_size: usize,
31
32 pub batch_size: usize,
34
35 pub flush_interval_ms: u64,
37}
38
39impl Default for StreamingConfig {
40 fn default() -> Self {
41 Self {
42 buffer_size: 10_000,
43 batch_size: 128,
44 flush_interval_ms: 50,
45 }
46 }
47}
48
49#[allow(dead_code)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub(crate) enum WriteMode {
62 Api,
64 Streaming,
66}
67
68#[derive(Debug, thiserror::Error)]
70pub enum BackpressureError {
71 #[error("streaming buffer is full (backpressure)")]
73 BufferFull,
74
75 #[error("streaming is not configured on this collection")]
77 NotConfigured,
78
79 #[error("streaming drain task has exited; the ingestion pipeline is dead")]
84 DrainTaskDead,
85}
86
87pub struct StreamIngester {
99 sender: mpsc::Sender<Point>,
100 config: StreamingConfig,
101 drain_handle: Option<tokio::task::JoinHandle<()>>,
102 shutdown: Arc<Notify>,
103}
104
105impl StreamIngester {
106 #[must_use]
111 pub fn new(collection: Collection, config: StreamingConfig) -> Self {
112 let (tx, rx) = mpsc::channel(config.buffer_size);
113 let shutdown = Arc::new(Notify::new());
114
115 let drain_handle = tokio::spawn(drain_loop(
116 collection,
117 rx,
118 config.batch_size,
119 config.flush_interval_ms,
120 Arc::clone(&shutdown),
121 ));
122
123 Self {
124 sender: tx,
125 config,
126 drain_handle: Some(drain_handle),
127 shutdown,
128 }
129 }
130
131 pub fn try_send(&self, point: Point) -> Result<(), BackpressureError> {
142 self.sender.try_send(point).map_err(|e| match e {
143 mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
144 mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
145 })
146 }
147
148 #[must_use]
150 pub fn config(&self) -> &StreamingConfig {
151 &self.config
152 }
153
154 pub async fn shutdown(mut self) {
158 self.shutdown.notify_one();
159 if let Some(handle) = self.drain_handle.take() {
160 let _ = handle.await;
162 }
163 }
164}
165
166impl Drop for StreamIngester {
167 fn drop(&mut self) {
168 if let Some(handle) = self.drain_handle.take() {
171 handle.abort();
172 }
173 }
174}
175
176async fn drain_loop(
183 collection: Collection,
184 mut rx: mpsc::Receiver<Point>,
185 batch_size: usize,
186 flush_interval_ms: u64,
187 shutdown: Arc<Notify>,
188) {
189 let mut batch: Vec<Point> = Vec::with_capacity(batch_size);
190 let mut interval = tokio::time::interval(std::time::Duration::from_millis(flush_interval_ms));
191 interval.tick().await;
193
194 loop {
195 tokio::select! {
196 () = shutdown.notified() => {
199 while let Ok(point) = rx.try_recv() {
200 batch.push(point);
201 if batch.len() >= batch_size {
203 flush_batch(&collection, &mut batch).await;
204 }
205 }
206 if !batch.is_empty() {
207 flush_batch(&collection, &mut batch).await;
208 }
209 break;
210 }
211
212 _ = interval.tick() => {
214 if !batch.is_empty() {
215 flush_batch(&collection, &mut batch).await;
216 }
217 }
218
219 msg = rx.recv() => {
221 if let Some(point) = msg {
222 batch.push(point);
223 if batch.len() >= batch_size {
224 flush_batch(&collection, &mut batch).await;
225 interval.reset();
228 }
229 } else {
230 if !batch.is_empty() {
232 flush_batch(&collection, &mut batch).await;
233 }
234 break;
235 }
236 }
237 }
238 }
239}
240
241async fn flush_batch(collection: &Collection, batch: &mut Vec<Point>) {
247 let points: Vec<Point> = std::mem::take(batch);
248
249 let delta_entries: Vec<(u64, Vec<f32>)> = if collection.delta_buffer.is_active() {
252 points.iter().map(|p| (p.id, p.vector.clone())).collect()
253 } else {
254 Vec::new()
255 };
256
257 let coll = collection.clone();
258 let result = tokio::task::spawn_blocking(move || coll.upsert(points)).await;
261 match result {
262 Ok(Ok(())) => {
263 if !delta_entries.is_empty() {
267 collection.delta_buffer.extend(delta_entries);
268 }
269 }
270 Ok(Err(e)) => {
271 tracing::error!("Streaming drain flush failed: {e}");
272 }
273 Err(e) => {
274 tracing::error!("Streaming drain task panicked: {e}");
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::distance::DistanceMetric;
283 use std::time::Duration;
284 use tempfile::TempDir;
285
286 fn test_collection(dim: usize) -> (TempDir, Collection) {
288 let dir = TempDir::new().expect("tempdir");
289 let path = dir.path().join("test_stream_coll");
290 let coll =
291 Collection::create(path, dim, DistanceMetric::Cosine).expect("create collection");
292 (dir, coll)
293 }
294
295 fn make_point(id: u64, dim: usize) -> Point {
297 Point {
298 id,
299 vector: vec![0.1_f32; dim],
300 payload: None,
301 sparse_vectors: None,
302 }
303 }
304
305 #[tokio::test]
306 async fn test_stream_try_send_succeeds_when_capacity_available() {
307 let (_dir, coll) = test_collection(4);
308 let config = StreamingConfig {
309 buffer_size: 10,
310 batch_size: 100,
311 flush_interval_ms: 5000,
312 };
313 let ingester = StreamIngester::new(coll, config);
314
315 let result = ingester.try_send(make_point(1, 4));
316 assert!(
317 result.is_ok(),
318 "try_send should succeed when channel has capacity"
319 );
320
321 ingester.shutdown().await;
322 }
323
324 #[tokio::test]
325 async fn test_stream_try_send_returns_buffer_full_when_at_capacity() {
326 let (_dir, coll) = test_collection(4);
327 let config = StreamingConfig {
328 buffer_size: 2,
329 batch_size: 100, flush_interval_ms: 60_000, };
332 let ingester = StreamIngester::new(coll, config);
333
334 assert!(ingester.try_send(make_point(1, 4)).is_ok());
336 assert!(ingester.try_send(make_point(2, 4)).is_ok());
337
338 let result = ingester.try_send(make_point(3, 4));
340 assert!(result.is_err(), "should return error when buffer full");
341 match result.unwrap_err() {
342 BackpressureError::BufferFull => {} other => panic!("expected BufferFull, got: {other}"),
344 }
345
346 ingester.shutdown().await;
347 }
348
349 #[tokio::test]
350 async fn test_stream_drain_flushes_at_batch_size() {
351 let (_dir, coll) = test_collection(4);
352 let batch_size = 4;
353 let config = StreamingConfig {
354 buffer_size: 100,
355 batch_size,
356 flush_interval_ms: 60_000, };
358 let coll_clone = coll.clone();
359 let ingester = StreamIngester::new(coll, config);
360
361 for i in 0..batch_size {
363 ingester
364 .try_send(make_point(i as u64 + 1, 4))
365 .expect("send should succeed");
366 }
367
368 tokio::time::sleep(Duration::from_millis(200)).await;
370
371 let results = coll_clone.get(&[1, 2, 3, 4]);
373 let found_count = results.iter().filter(|r| r.is_some()).count();
374 assert_eq!(
375 found_count, 4,
376 "all {batch_size} points should be flushed via upsert"
377 );
378
379 ingester.shutdown().await;
380 }
381
382 #[tokio::test]
383 async fn test_stream_drain_flushes_partial_batch_after_timeout() {
384 let (_dir, coll) = test_collection(4);
385 let config = StreamingConfig {
386 buffer_size: 100,
387 batch_size: 100, flush_interval_ms: 50,
389 };
390 let coll_clone = coll.clone();
391 let ingester = StreamIngester::new(coll, config);
392
393 ingester.try_send(make_point(1, 4)).expect("send 1");
395 ingester.try_send(make_point(2, 4)).expect("send 2");
396
397 tokio::time::sleep(Duration::from_millis(300)).await;
399
400 let results = coll_clone.get(&[1, 2]);
402 let found_count = results.iter().filter(|r| r.is_some()).count();
403 assert_eq!(
404 found_count, 2,
405 "partial batch should be flushed after flush_interval_ms"
406 );
407
408 ingester.shutdown().await;
409 }
410
411 #[tokio::test]
412 async fn test_stream_shutdown_flushes_remaining_points() {
413 let (_dir, coll) = test_collection(4);
414 let config = StreamingConfig {
415 buffer_size: 100,
416 batch_size: 1000, flush_interval_ms: 60_000, };
419 let coll_clone = coll.clone();
420 let ingester = StreamIngester::new(coll, config);
421
422 ingester.try_send(make_point(10, 4)).expect("send");
424 ingester.try_send(make_point(11, 4)).expect("send");
425
426 tokio::time::sleep(Duration::from_millis(50)).await;
430
431 ingester.shutdown().await;
433
434 let results = coll_clone.get(&[10, 11]);
436 let found_count = results.iter().filter(|r| r.is_some()).count();
437 assert_eq!(
438 found_count, 2,
439 "shutdown should flush remaining buffered points"
440 );
441 }
442
443 #[tokio::test]
444 async fn test_stream_delta_drain_loop_routes_to_delta_when_active() {
445 let (_dir, coll) = test_collection(4);
446 let config = StreamingConfig {
447 buffer_size: 100,
448 batch_size: 4,
449 flush_interval_ms: 50,
450 };
451 let coll_clone = coll.clone();
452
453 coll.delta_buffer.activate();
455
456 let ingester = StreamIngester::new(coll, config);
457
458 for i in 1..=4 {
460 ingester
461 .try_send(make_point(i, 4))
462 .expect("send should succeed");
463 }
464
465 tokio::time::sleep(Duration::from_millis(300)).await;
467
468 let results = coll_clone.get(&[1, 2, 3, 4]);
470 let found = results.iter().filter(|r| r.is_some()).count();
471 assert_eq!(found, 4, "upsert should write all points to storage");
472
473 assert_eq!(
475 coll_clone.delta_buffer.len(),
476 4,
477 "delta buffer should contain the streamed points when active"
478 );
479
480 ingester.shutdown().await;
481 }
482
483 #[tokio::test]
484 #[allow(clippy::cast_possible_truncation)]
485 async fn test_stream_searchable_immediately() {
486 let (_dir, coll) = test_collection(4);
487 let config = StreamingConfig {
488 buffer_size: 100,
489 batch_size: 4,
490 flush_interval_ms: 50,
491 };
492 let coll_clone = coll.clone();
493 let ingester = StreamIngester::new(coll, config);
494
495 for i in 1..=4u64 {
497 let mut vec = vec![0.0_f32; 4];
498 vec[(i as usize - 1) % 4] = 1.0;
499 let p = Point {
500 id: i,
501 vector: vec,
502 payload: None,
503 sparse_vectors: None,
504 };
505 ingester.try_send(p).expect("send should succeed");
506 }
507
508 tokio::time::sleep(Duration::from_millis(300)).await;
510
511 let query = vec![1.0, 0.0, 0.0, 0.0];
513 let results = coll_clone.search(&query, 4).expect("search should succeed");
514 assert!(
515 !results.is_empty(),
516 "inserted points should be searchable after drain"
517 );
518 assert_eq!(results[0].point.id, 1, "closest match should be id=1");
520
521 ingester.shutdown().await;
522 }
523
524 #[tokio::test]
525 #[allow(clippy::cast_precision_loss)]
526 async fn test_stream_delta_rebuild_no_data_loss() {
527 let (_dir, coll) = test_collection(4);
528 let initial_points: Vec<Point> = (1..=5u64)
530 .map(|i| {
531 let mut vec = vec![0.0_f32; 4];
532 vec[0] = i as f32;
533 Point {
534 id: i,
535 vector: vec,
536 payload: None,
537 sparse_vectors: None,
538 }
539 })
540 .collect();
541 coll.upsert(initial_points).expect("upsert initial points");
542
543 coll.delta_buffer.activate();
545 assert!(coll.delta_buffer.is_active());
546
547 let config = StreamingConfig {
548 buffer_size: 100,
549 batch_size: 4,
550 flush_interval_ms: 50,
551 };
552 let coll_clone = coll.clone();
553 let ingester = StreamIngester::new(coll, config);
554
555 for i in 6..=10u64 {
557 let mut vec = vec![0.0_f32; 4];
558 vec[0] = i as f32;
559 let p = Point {
560 id: i,
561 vector: vec,
562 payload: None,
563 sparse_vectors: None,
564 };
565 ingester.try_send(p).expect("send should succeed");
566 }
567
568 tokio::time::sleep(Duration::from_millis(300)).await;
570
571 let query = vec![10.0, 0.0, 0.0, 0.0];
573 let results = coll_clone
574 .search_ids(&query, 10)
575 .expect("search_ids should succeed");
576 let found_ids: std::collections::HashSet<u64> = results.iter().map(|(id, _)| *id).collect();
577
578 for id in 1..=10 {
580 assert!(
581 found_ids.contains(&id),
582 "point id={id} should be in search results"
583 );
584 }
585
586 let drained = coll_clone.delta_buffer.deactivate_and_drain();
588 assert!(!coll_clone.delta_buffer.is_active());
589 assert_eq!(drained.len(), 5, "delta should have had 5 entries");
590
591 ingester.shutdown().await;
592 }
593}