Skip to main content

velesdb_core/collection/streaming/
ingester.rs

1//! StreamIngester: bounded-channel ingestion with micro-batch drain.
2
3use crate::collection::types::Collection;
4use crate::point::Point;
5
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio::sync::Notify;
9
10/// Configuration for the streaming ingestion pipeline.
11///
12/// Controls channel capacity, micro-batch sizing, and flush timing.
13///
14/// # Defaults
15///
16/// | Parameter          | Default  |
17/// |--------------------|----------|
18/// | `buffer_size`      | 10 000   |
19/// | `batch_size`       | 128      |
20/// | `flush_interval_ms`| 50       |
21///
22/// Future: persist StreamingConfig in CollectionConfig (STREAM-04)
23///
24/// `StreamingConfig` is currently runtime-only. A future pass should
25/// serialize it into `CollectionConfig` so the pipeline is automatically
26/// restored on `Collection::open`.
27#[derive(Debug, Clone)]
28pub struct StreamingConfig {
29    /// Capacity of the bounded mpsc channel (backpressure threshold).
30    pub buffer_size: usize,
31
32    /// Number of points that trigger an immediate micro-batch flush.
33    pub batch_size: usize,
34
35    /// Maximum time (ms) before a partial batch is flushed.
36    pub flush_interval_ms: u64,
37}
38
39impl Default for StreamingConfig {
40    fn default() -> Self {
41        Self {
42            buffer_size: 10_000,
43            batch_size: 128,
44            flush_interval_ms: 50,
45        }
46    }
47}
48
49/// Internal write mode discriminator (not exposed to users).
50///
51/// Distinguishes between API-driven writes (synchronous upsert) and
52/// streaming-driven writes (micro-batch drain).
53///
54/// Future: integrate WriteMode into StreamingConfig (STREAM-03)
55///
56/// `WriteMode` is currently unused. Once streaming-specific write paths
57/// (e.g., bypass WAL for low-latency inserts) are implemented, wire this
58/// into the flush pipeline.
59#[allow(dead_code)] // Tracked: STREAM-03
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub(crate) enum WriteMode {
62    /// Standard synchronous API upsert.
63    Api,
64    /// Streaming micro-batch drain.
65    Streaming,
66}
67
68/// Error returned when the streaming channel cannot accept a point.
69#[derive(Debug, thiserror::Error)]
70pub enum BackpressureError {
71    /// The ingestion buffer is full; the caller should retry after a short delay.
72    #[error("streaming buffer is full (backpressure)")]
73    BufferFull,
74
75    /// Streaming is not configured on this collection.
76    #[error("streaming is not configured on this collection")]
77    NotConfigured,
78
79    /// The drain task has exited; the streaming pipeline is no longer functional.
80    ///
81    /// This is a fatal condition — the server should respond 503 Service Unavailable
82    /// and the collection may need to be reconfigured.
83    #[error("streaming drain task has exited; the ingestion pipeline is dead")]
84    DrainTaskDead,
85}
86
87/// Streaming ingestion handle for a single collection.
88///
89/// Owns a bounded mpsc sender and a background drain task. Points sent via
90/// [`try_send`](Self::try_send) are accumulated into micro-batches and flushed
91/// to the collection's existing `upsert` pipeline.
92///
93/// # Shutdown
94///
95/// Call [`shutdown`](Self::shutdown) to gracefully drain remaining points.
96/// If dropped without shutdown, the drain task is aborted (points in the
97/// channel may be lost).
98pub struct StreamIngester {
99    sender: mpsc::Sender<Point>,
100    config: StreamingConfig,
101    drain_handle: Option<tokio::task::JoinHandle<()>>,
102    shutdown: Arc<Notify>,
103}
104
105impl StreamIngester {
106    /// Creates a new streaming ingester for the given collection.
107    ///
108    /// Spawns a background drain task that accumulates points and flushes
109    /// micro-batches via `Collection::upsert`.
110    #[must_use]
111    pub fn new(collection: Collection, config: StreamingConfig) -> Self {
112        let (tx, rx) = mpsc::channel(config.buffer_size);
113        let shutdown = Arc::new(Notify::new());
114
115        let drain_handle = tokio::spawn(drain_loop(
116            collection,
117            rx,
118            config.batch_size,
119            config.flush_interval_ms,
120            Arc::clone(&shutdown),
121        ));
122
123        Self {
124            sender: tx,
125            config,
126            drain_handle: Some(drain_handle),
127            shutdown,
128        }
129    }
130
131    /// Attempts to send a point into the streaming channel.
132    ///
133    /// Returns immediately. If the channel is at capacity, returns
134    /// [`BackpressureError::BufferFull`]. If the drain task has exited
135    /// (channel closed), returns [`BackpressureError::DrainTaskDead`].
136    ///
137    /// # Errors
138    ///
139    /// - [`BackpressureError::BufferFull`] — the bounded channel is at capacity.
140    /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
141    pub fn try_send(&self, point: Point) -> Result<(), BackpressureError> {
142        self.sender.try_send(point).map_err(|e| match e {
143            mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
144            mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
145        })
146    }
147
148    /// Returns a reference to the streaming configuration.
149    #[must_use]
150    pub fn config(&self) -> &StreamingConfig {
151        &self.config
152    }
153
154    /// Gracefully shuts down the ingester, flushing any remaining buffered points.
155    ///
156    /// This notifies the drain loop to exit and awaits its completion.
157    pub async fn shutdown(mut self) {
158        self.shutdown.notify_one();
159        if let Some(handle) = self.drain_handle.take() {
160            // Ignore JoinError — the drain loop should not panic.
161            let _ = handle.await;
162        }
163    }
164}
165
166impl Drop for StreamIngester {
167    fn drop(&mut self) {
168        // Abort the drain task to prevent orphaned background tasks.
169        // For graceful shutdown with flush, call `shutdown()` before dropping.
170        if let Some(handle) = self.drain_handle.take() {
171            handle.abort();
172        }
173    }
174}
175
176/// Background drain loop that accumulates points and flushes micro-batches.
177///
178/// Uses `tokio::select!` with three branches:
179/// 1. Shutdown notification — flush remaining batch and exit.
180/// 2. Timer tick — flush partial batch if non-empty.
181/// 3. Channel receive — push to batch; flush when `batch_size` reached.
182async fn drain_loop(
183    collection: Collection,
184    mut rx: mpsc::Receiver<Point>,
185    batch_size: usize,
186    flush_interval_ms: u64,
187    shutdown: Arc<Notify>,
188) {
189    let mut batch: Vec<Point> = Vec::with_capacity(batch_size);
190    let mut interval = tokio::time::interval(std::time::Duration::from_millis(flush_interval_ms));
191    // The first tick completes immediately; consume it.
192    interval.tick().await;
193
194    loop {
195        tokio::select! {
196            // Branch 1: shutdown signal — drain remaining channel items in
197            // micro-batches (M-1: flush at batch_size to bound memory usage).
198            () = shutdown.notified() => {
199                while let Ok(point) = rx.try_recv() {
200                    batch.push(point);
201                    // Flush at batch_size boundaries to avoid unbounded accumulation.
202                    if batch.len() >= batch_size {
203                        flush_batch(&collection, &mut batch).await;
204                    }
205                }
206                if !batch.is_empty() {
207                    flush_batch(&collection, &mut batch).await;
208                }
209                break;
210            }
211
212            // Branch 2: timer tick — flush partial batch
213            _ = interval.tick() => {
214                if !batch.is_empty() {
215                    flush_batch(&collection, &mut batch).await;
216                }
217            }
218
219            // Branch 3: receive point from channel
220            msg = rx.recv() => {
221                if let Some(point) = msg {
222                    batch.push(point);
223                    if batch.len() >= batch_size {
224                        flush_batch(&collection, &mut batch).await;
225                        // Reset the interval so the timer doesn't fire
226                        // immediately after a batch-size flush.
227                        interval.reset();
228                    }
229                } else {
230                    // Channel closed (all senders dropped).
231                    if !batch.is_empty() {
232                        flush_batch(&collection, &mut batch).await;
233                    }
234                    break;
235                }
236            }
237        }
238    }
239}
240
241/// Flushes the accumulated batch via the collection's existing upsert pipeline.
242///
243/// Runs the blocking upsert on Tokio's blocking thread pool to avoid stalling
244/// the async runtime. If the delta buffer is active (HNSW rebuild in progress),
245/// also pushes the batch vectors into the delta buffer for immediate searchability.
246async fn flush_batch(collection: &Collection, batch: &mut Vec<Point>) {
247    let points: Vec<Point> = std::mem::take(batch);
248
249    // Snapshot vectors for delta buffer before moving points into upsert.
250    // Only allocate if delta is active (common case: delta is inactive).
251    let delta_entries: Vec<(u64, Vec<f32>)> = if collection.delta_buffer.is_active() {
252        points.iter().map(|p| (p.id, p.vector.clone())).collect()
253    } else {
254        Vec::new()
255    };
256
257    let coll = collection.clone();
258    // spawn_blocking wraps the synchronous upsert call (which acquires
259    // multiple RwLocks and does mmap I/O) to prevent blocking the async runtime.
260    let result = tokio::task::spawn_blocking(move || coll.upsert(points)).await;
261    match result {
262        Ok(Ok(())) => {
263            // After successful upsert, push to delta buffer if active.
264            // The upsert wrote to storage+WAL; delta is an additional runtime
265            // copy so search can find these vectors before HNSW is rebuilt.
266            if !delta_entries.is_empty() {
267                collection.delta_buffer.extend(delta_entries);
268            }
269        }
270        Ok(Err(e)) => {
271            tracing::error!("Streaming drain flush failed: {e}");
272        }
273        Err(e) => {
274            tracing::error!("Streaming drain task panicked: {e}");
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::distance::DistanceMetric;
283    use std::time::Duration;
284    use tempfile::TempDir;
285
286    /// Helper: create a test collection in a temp directory.
287    fn test_collection(dim: usize) -> (TempDir, Collection) {
288        let dir = TempDir::new().expect("tempdir");
289        let path = dir.path().join("test_stream_coll");
290        let coll =
291            Collection::create(path, dim, DistanceMetric::Cosine).expect("create collection");
292        (dir, coll)
293    }
294
295    /// Helper: create a point with the given id and dimension.
296    fn make_point(id: u64, dim: usize) -> Point {
297        Point {
298            id,
299            vector: vec![0.1_f32; dim],
300            payload: None,
301            sparse_vectors: None,
302        }
303    }
304
305    #[tokio::test]
306    async fn test_stream_try_send_succeeds_when_capacity_available() {
307        let (_dir, coll) = test_collection(4);
308        let config = StreamingConfig {
309            buffer_size: 10,
310            batch_size: 100,
311            flush_interval_ms: 5000,
312        };
313        let ingester = StreamIngester::new(coll, config);
314
315        let result = ingester.try_send(make_point(1, 4));
316        assert!(
317            result.is_ok(),
318            "try_send should succeed when channel has capacity"
319        );
320
321        ingester.shutdown().await;
322    }
323
324    #[tokio::test]
325    async fn test_stream_try_send_returns_buffer_full_when_at_capacity() {
326        let (_dir, coll) = test_collection(4);
327        let config = StreamingConfig {
328            buffer_size: 2,
329            batch_size: 100, // Large batch_size so drain doesn't flush quickly
330            flush_interval_ms: 60_000, // Very long interval to avoid timer flush
331        };
332        let ingester = StreamIngester::new(coll, config);
333
334        // Fill the channel (capacity = 2)
335        assert!(ingester.try_send(make_point(1, 4)).is_ok());
336        assert!(ingester.try_send(make_point(2, 4)).is_ok());
337
338        // Third send should fail with BufferFull
339        let result = ingester.try_send(make_point(3, 4));
340        assert!(result.is_err(), "should return error when buffer full");
341        match result.unwrap_err() {
342            BackpressureError::BufferFull => {} // expected
343            other => panic!("expected BufferFull, got: {other}"),
344        }
345
346        ingester.shutdown().await;
347    }
348
349    #[tokio::test]
350    async fn test_stream_drain_flushes_at_batch_size() {
351        let (_dir, coll) = test_collection(4);
352        let batch_size = 4;
353        let config = StreamingConfig {
354            buffer_size: 100,
355            batch_size,
356            flush_interval_ms: 60_000, // Long interval — batch_size should trigger flush
357        };
358        let coll_clone = coll.clone();
359        let ingester = StreamIngester::new(coll, config);
360
361        // Send exactly batch_size points
362        for i in 0..batch_size {
363            ingester
364                .try_send(make_point(i as u64 + 1, 4))
365                .expect("send should succeed");
366        }
367
368        // Give the drain loop time to process
369        tokio::time::sleep(Duration::from_millis(200)).await;
370
371        // Verify points were upserted
372        let results = coll_clone.get(&[1, 2, 3, 4]);
373        let found_count = results.iter().filter(|r| r.is_some()).count();
374        assert_eq!(
375            found_count, 4,
376            "all {batch_size} points should be flushed via upsert"
377        );
378
379        ingester.shutdown().await;
380    }
381
382    #[tokio::test]
383    async fn test_stream_drain_flushes_partial_batch_after_timeout() {
384        let (_dir, coll) = test_collection(4);
385        let config = StreamingConfig {
386            buffer_size: 100,
387            batch_size: 100, // Very large batch — timer should trigger flush
388            flush_interval_ms: 50,
389        };
390        let coll_clone = coll.clone();
391        let ingester = StreamIngester::new(coll, config);
392
393        // Send only 2 points (well below batch_size of 100)
394        ingester.try_send(make_point(1, 4)).expect("send 1");
395        ingester.try_send(make_point(2, 4)).expect("send 2");
396
397        // Wait longer than flush_interval_ms
398        tokio::time::sleep(Duration::from_millis(300)).await;
399
400        // Verify points were flushed by the timer
401        let results = coll_clone.get(&[1, 2]);
402        let found_count = results.iter().filter(|r| r.is_some()).count();
403        assert_eq!(
404            found_count, 2,
405            "partial batch should be flushed after flush_interval_ms"
406        );
407
408        ingester.shutdown().await;
409    }
410
411    #[tokio::test]
412    async fn test_stream_shutdown_flushes_remaining_points() {
413        let (_dir, coll) = test_collection(4);
414        let config = StreamingConfig {
415            buffer_size: 100,
416            batch_size: 1000, // Very large batch — won't trigger batch flush
417            flush_interval_ms: 60_000, // Very long — won't trigger timer flush
418        };
419        let coll_clone = coll.clone();
420        let ingester = StreamIngester::new(coll, config);
421
422        // Send a few points
423        ingester.try_send(make_point(10, 4)).expect("send");
424        ingester.try_send(make_point(11, 4)).expect("send");
425
426        // Sleep long enough to ensure the drain loop has received the points
427        // from the channel before shutdown is signalled (N-5: yield_now()
428        // is not sufficient — the drain loop may not have been scheduled yet).
429        tokio::time::sleep(Duration::from_millis(50)).await;
430
431        // Shutdown should flush remaining
432        ingester.shutdown().await;
433
434        // Verify points were flushed during shutdown
435        let results = coll_clone.get(&[10, 11]);
436        let found_count = results.iter().filter(|r| r.is_some()).count();
437        assert_eq!(
438            found_count, 2,
439            "shutdown should flush remaining buffered points"
440        );
441    }
442
443    #[tokio::test]
444    async fn test_stream_delta_drain_loop_routes_to_delta_when_active() {
445        let (_dir, coll) = test_collection(4);
446        let config = StreamingConfig {
447            buffer_size: 100,
448            batch_size: 4,
449            flush_interval_ms: 50,
450        };
451        let coll_clone = coll.clone();
452
453        // Activate delta buffer (simulating HNSW rebuild)
454        coll.delta_buffer.activate();
455
456        let ingester = StreamIngester::new(coll, config);
457
458        // Send points via streaming
459        for i in 1..=4 {
460            ingester
461                .try_send(make_point(i, 4))
462                .expect("send should succeed");
463        }
464
465        // Wait for drain loop to flush
466        tokio::time::sleep(Duration::from_millis(300)).await;
467
468        // Verify points are in storage (upsert always writes)
469        let results = coll_clone.get(&[1, 2, 3, 4]);
470        let found = results.iter().filter(|r| r.is_some()).count();
471        assert_eq!(found, 4, "upsert should write all points to storage");
472
473        // Verify points are also in the delta buffer
474        assert_eq!(
475            coll_clone.delta_buffer.len(),
476            4,
477            "delta buffer should contain the streamed points when active"
478        );
479
480        ingester.shutdown().await;
481    }
482
483    #[tokio::test]
484    #[allow(clippy::cast_possible_truncation)]
485    async fn test_stream_searchable_immediately() {
486        let (_dir, coll) = test_collection(4);
487        let config = StreamingConfig {
488            buffer_size: 100,
489            batch_size: 4,
490            flush_interval_ms: 50,
491        };
492        let coll_clone = coll.clone();
493        let ingester = StreamIngester::new(coll, config);
494
495        // Insert points with distinct vectors for search
496        for i in 1..=4u64 {
497            let mut vec = vec![0.0_f32; 4];
498            vec[(i as usize - 1) % 4] = 1.0;
499            let p = Point {
500                id: i,
501                vector: vec,
502                payload: None,
503                sparse_vectors: None,
504            };
505            ingester.try_send(p).expect("send should succeed");
506        }
507
508        // Wait for drain to flush
509        tokio::time::sleep(Duration::from_millis(300)).await;
510
511        // Search for the first vector — should find it via HNSW (indexed by upsert)
512        let query = vec![1.0, 0.0, 0.0, 0.0];
513        let results = coll_clone.search(&query, 4).expect("search should succeed");
514        assert!(
515            !results.is_empty(),
516            "inserted points should be searchable after drain"
517        );
518        // The closest result to [1,0,0,0] should be id=1
519        assert_eq!(results[0].point.id, 1, "closest match should be id=1");
520
521        ingester.shutdown().await;
522    }
523
524    #[tokio::test]
525    #[allow(clippy::cast_precision_loss)]
526    async fn test_stream_delta_rebuild_no_data_loss() {
527        let (_dir, coll) = test_collection(4);
528        // Pre-insert some points via direct upsert (these go into HNSW)
529        let initial_points: Vec<Point> = (1..=5u64)
530            .map(|i| {
531                let mut vec = vec![0.0_f32; 4];
532                vec[0] = i as f32;
533                Point {
534                    id: i,
535                    vector: vec,
536                    payload: None,
537                    sparse_vectors: None,
538                }
539            })
540            .collect();
541        coll.upsert(initial_points).expect("upsert initial points");
542
543        // Simulate rebuild: activate delta buffer
544        coll.delta_buffer.activate();
545        assert!(coll.delta_buffer.is_active());
546
547        let config = StreamingConfig {
548            buffer_size: 100,
549            batch_size: 4,
550            flush_interval_ms: 50,
551        };
552        let coll_clone = coll.clone();
553        let ingester = StreamIngester::new(coll, config);
554
555        // Insert new points during "rebuild"
556        for i in 6..=10u64 {
557            let mut vec = vec![0.0_f32; 4];
558            vec[0] = i as f32;
559            let p = Point {
560                id: i,
561                vector: vec,
562                payload: None,
563                sparse_vectors: None,
564            };
565            ingester.try_send(p).expect("send should succeed");
566        }
567
568        // Wait for drain to flush
569        tokio::time::sleep(Duration::from_millis(300)).await;
570
571        // Search — should find both HNSW (old) and delta (new) points
572        let query = vec![10.0, 0.0, 0.0, 0.0];
573        let results = coll_clone
574            .search_ids(&query, 10)
575            .expect("search_ids should succeed");
576        let found_ids: std::collections::HashSet<u64> = results.iter().map(|(id, _)| *id).collect();
577
578        // All 10 points should be found (5 HNSW + 5 delta)
579        for id in 1..=10 {
580            assert!(
581                found_ids.contains(&id),
582                "point id={id} should be in search results"
583            );
584        }
585
586        // Deactivate and drain
587        let drained = coll_clone.delta_buffer.deactivate_and_drain();
588        assert!(!coll_clone.delta_buffer.is_active());
589        assert_eq!(drained.len(), 5, "delta should have had 5 entries");
590
591        ingester.shutdown().await;
592    }
593}