Skip to main content

velesdb_core/collection/streaming/
ingester.rs

1//! StreamIngester: bounded-channel ingestion with micro-batch drain.
2
3use crate::collection::types::Collection;
4use crate::point::Point;
5
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio::sync::Notify;
9
10/// Configuration for the streaming ingestion pipeline.
11///
12/// Controls channel capacity, micro-batch sizing, and flush timing.
13///
14/// # Defaults
15///
16/// | Parameter          | Default  |
17/// |--------------------|----------|
18/// | `buffer_size`      | 10 000   |
19/// | `batch_size`       | 128      |
20/// | `flush_interval_ms`| 50       |
21///
22/// Future: persist StreamingConfig in CollectionConfig (STREAM-04)
23///
24/// `StreamingConfig` is currently runtime-only. A future pass should
25/// serialize it into `CollectionConfig` so the pipeline is automatically
26/// restored on `Collection::open`.
27#[derive(Debug, Clone)]
28pub struct StreamingConfig {
29    /// Capacity of the bounded mpsc channel (backpressure threshold).
30    pub buffer_size: usize,
31
32    /// Number of points that trigger an immediate micro-batch flush.
33    pub batch_size: usize,
34
35    /// Maximum time (ms) before a partial batch is flushed.
36    pub flush_interval_ms: u64,
37}
38
39impl Default for StreamingConfig {
40    fn default() -> Self {
41        Self {
42            buffer_size: 10_000,
43            batch_size: 128,
44            flush_interval_ms: 50,
45        }
46    }
47}
48
49/// Internal write mode discriminator (not exposed to users).
50///
51/// Distinguishes between API-driven writes (synchronous upsert) and
52/// streaming-driven writes (micro-batch drain).
53///
54/// Future: integrate WriteMode into StreamingConfig (STREAM-03)
55///
56/// `WriteMode` is currently unused. Once streaming-specific write paths
57/// (e.g., bypass WAL for low-latency inserts) are implemented, wire this
58/// into the flush pipeline.
59#[allow(dead_code)] // Tracked: STREAM-03
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub(crate) enum WriteMode {
62    /// Standard synchronous API upsert.
63    Api,
64    /// Streaming micro-batch drain.
65    Streaming,
66}
67
68/// Error returned when the streaming channel cannot accept a point.
69#[derive(Debug, thiserror::Error)]
70#[non_exhaustive]
71pub enum BackpressureError {
72    /// The ingestion buffer is full; the caller should retry after a short delay.
73    #[error("streaming buffer is full (backpressure)")]
74    BufferFull,
75
76    /// Streaming is not configured on this collection.
77    #[error("streaming is not configured on this collection")]
78    NotConfigured,
79
80    /// The drain task has exited; the streaming pipeline is no longer functional.
81    ///
82    /// This is a fatal condition — the server should respond 503 Service Unavailable
83    /// and the collection may need to be reconfigured.
84    #[error("streaming drain task has exited; the ingestion pipeline is dead")]
85    DrainTaskDead,
86}
87
88/// Streaming ingestion handle for a single collection.
89///
90/// Owns a bounded mpsc sender and a background drain task. Points sent via
91/// [`try_send`](Self::try_send) are accumulated into micro-batches and flushed
92/// to the collection's existing `upsert` pipeline.
93///
94/// # Shutdown
95///
96/// Call [`shutdown`](Self::shutdown) to gracefully drain remaining points.
97/// If dropped without shutdown, the drain task is aborted (points in the
98/// channel may be lost).
99pub struct StreamIngester {
100    sender: mpsc::Sender<Point>,
101    config: StreamingConfig,
102    drain_handle: Option<tokio::task::JoinHandle<()>>,
103    shutdown: Arc<Notify>,
104}
105
106impl StreamIngester {
107    /// Creates a new streaming ingester for the given collection.
108    ///
109    /// Spawns a background drain task that accumulates points and flushes
110    /// micro-batches via `Collection::upsert`.
111    #[must_use]
112    pub(crate) fn new(collection: Collection, config: StreamingConfig) -> Self {
113        let (tx, rx) = mpsc::channel(config.buffer_size);
114        let shutdown = Arc::new(Notify::new());
115
116        let drain_handle = tokio::spawn(drain_loop(
117            collection,
118            rx,
119            config.batch_size,
120            config.flush_interval_ms,
121            Arc::clone(&shutdown),
122        ));
123
124        Self {
125            sender: tx,
126            config,
127            drain_handle: Some(drain_handle),
128            shutdown,
129        }
130    }
131
132    /// Attempts to send a point into the streaming channel.
133    ///
134    /// Returns immediately. If the channel is at capacity, returns
135    /// [`BackpressureError::BufferFull`]. If the drain task has exited
136    /// (channel closed), returns [`BackpressureError::DrainTaskDead`].
137    ///
138    /// # Errors
139    ///
140    /// - [`BackpressureError::BufferFull`] — the bounded channel is at capacity.
141    /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
142    pub fn try_send(&self, point: Point) -> Result<(), BackpressureError> {
143        self.sender.try_send(point).map_err(|e| match e {
144            mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
145            mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
146        })
147    }
148
149    /// Attempts to send a batch of points into the streaming channel.
150    ///
151    /// Sends points one by one through the bounded channel. If the channel
152    /// fills up mid-batch, returns [`BackpressureError::BufferFull`] — the
153    /// points already sent before the full condition are still queued and
154    /// will be drained normally. If the drain task has exited, returns
155    /// [`BackpressureError::DrainTaskDead`] on the first failed send.
156    ///
157    /// Returns the number of points successfully queued on success (all points).
158    ///
159    /// # Errors
160    ///
161    /// - [`BackpressureError::BufferFull`] — the bounded channel filled mid-batch.
162    /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
163    pub fn try_send_batch(
164        &self,
165        points: Vec<crate::point::Point>,
166    ) -> Result<usize, BackpressureError> {
167        let count = points.len();
168        for point in points {
169            self.sender.try_send(point).map_err(|e| match e {
170                mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
171                mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
172            })?;
173        }
174        Ok(count)
175    }
176
177    /// Returns a reference to the streaming configuration.
178    #[must_use]
179    pub fn config(&self) -> &StreamingConfig {
180        &self.config
181    }
182
183    /// Gracefully shuts down the ingester, flushing any remaining buffered points.
184    ///
185    /// This notifies the drain loop to exit and awaits its completion.
186    pub async fn shutdown(mut self) {
187        self.shutdown.notify_one();
188        if let Some(handle) = self.drain_handle.take() {
189            // Ignore JoinError — the drain loop should not panic.
190            let _ = handle.await;
191        }
192    }
193}
194
195impl Drop for StreamIngester {
196    fn drop(&mut self) {
197        // Abort the drain task to prevent orphaned background tasks.
198        // For graceful shutdown with flush, call `shutdown()` before dropping.
199        if let Some(handle) = self.drain_handle.take() {
200            handle.abort();
201        }
202    }
203}
204
205/// Background drain loop that accumulates points and flushes micro-batches.
206///
207/// Uses `tokio::select!` with three branches:
208/// 1. Shutdown notification — flush remaining batch and exit.
209/// 2. Timer tick — flush partial batch if non-empty.
210/// 3. Channel receive — push to batch; flush when `batch_size` reached.
211// Reason: tokio::select! macro expansion inflates cognitive complexity beyond
212// what the actual logic warrants. Each branch delegates to a helper function.
213#[allow(clippy::cognitive_complexity)]
214async fn drain_loop(
215    collection: Collection,
216    mut rx: mpsc::Receiver<Point>,
217    batch_size: usize,
218    flush_interval_ms: u64,
219    shutdown: Arc<Notify>,
220) {
221    let mut batch: Vec<Point> = Vec::with_capacity(batch_size);
222    let mut interval = tokio::time::interval(std::time::Duration::from_millis(flush_interval_ms));
223    // The first tick completes immediately; consume it.
224    interval.tick().await;
225
226    loop {
227        tokio::select! {
228            // Branch 1: shutdown signal — drain remaining channel items in
229            // micro-batches (M-1: flush at batch_size to bound memory usage).
230            () = shutdown.notified() => {
231                drain_on_shutdown(&collection, &mut rx, &mut batch, batch_size).await;
232                break;
233            }
234
235            // Branch 2: timer tick — flush partial batch
236            _ = interval.tick() => {
237                flush_if_non_empty(&collection, &mut batch).await;
238            }
239
240            // Branch 3: receive point from channel
241            msg = rx.recv() => {
242                if !handle_received_point(&collection, &mut batch, batch_size, &mut interval, msg).await {
243                    break;
244                }
245            }
246        }
247    }
248}
249
250/// Drains remaining channel items in micro-batches during shutdown.
251///
252/// Flushes at `batch_size` boundaries to bound memory usage, then
253/// flushes any remaining partial batch.
254async fn drain_on_shutdown(
255    collection: &Collection,
256    rx: &mut mpsc::Receiver<Point>,
257    batch: &mut Vec<Point>,
258    batch_size: usize,
259) {
260    while let Ok(point) = rx.try_recv() {
261        batch.push(point);
262        if batch.len() >= batch_size {
263            flush_batch(collection, batch).await;
264        }
265    }
266    flush_if_non_empty(collection, batch).await;
267}
268
269/// Flushes the batch only when it contains at least one point.
270async fn flush_if_non_empty(collection: &Collection, batch: &mut Vec<Point>) {
271    if !batch.is_empty() {
272        flush_batch(collection, batch).await;
273    }
274}
275
276/// Handles a single received point (or channel-closed signal).
277///
278/// Returns `true` to continue the drain loop, `false` to break.
279async fn handle_received_point(
280    collection: &Collection,
281    batch: &mut Vec<Point>,
282    batch_size: usize,
283    interval: &mut tokio::time::Interval,
284    msg: Option<Point>,
285) -> bool {
286    if let Some(point) = msg {
287        batch.push(point);
288        if batch.len() >= batch_size {
289            flush_batch(collection, batch).await;
290            // Reset the interval so the timer doesn't fire
291            // immediately after a batch-size flush.
292            interval.reset();
293        }
294        true
295    } else {
296        // Channel closed (all senders dropped).
297        flush_if_non_empty(collection, batch).await;
298        false
299    }
300}
301
302/// Flushes the accumulated batch via the collection's existing upsert pipeline.
303///
304/// Runs the blocking upsert on Tokio's blocking thread pool to avoid stalling
305/// the async runtime. If the delta buffer is active (HNSW rebuild in progress),
306/// also pushes the batch vectors into the delta buffer for immediate searchability.
307async fn flush_batch(collection: &Collection, batch: &mut Vec<Point>) {
308    let points: Vec<Point> = std::mem::take(batch);
309
310    // Snapshot vectors for delta buffer before moving points into upsert.
311    // Only allocate if delta is active (common case: delta is inactive).
312    let delta_entries: Vec<(u64, Vec<f32>)> = if collection.delta_buffer.is_active() {
313        points.iter().map(|p| (p.id, p.vector.clone())).collect()
314    } else {
315        Vec::new()
316    };
317
318    let coll = collection.clone();
319    // spawn_blocking wraps the synchronous upsert call (which acquires
320    // multiple RwLocks and does mmap I/O) to prevent blocking the async runtime.
321    let result = tokio::task::spawn_blocking(move || coll.upsert(points)).await;
322    match result {
323        Ok(Ok(())) => {
324            // After successful upsert, push to delta buffer if active.
325            // The upsert wrote to storage+WAL; delta is an additional runtime
326            // copy so search can find these vectors before HNSW is rebuilt.
327            if !delta_entries.is_empty() {
328                collection.delta_buffer.extend(delta_entries);
329            }
330        }
331        Ok(Err(e)) => {
332            tracing::error!("Streaming drain flush failed: {e}");
333        }
334        Err(e) => {
335            tracing::error!("Streaming drain task panicked: {e}");
336        }
337    }
338}
339
340// All ingester tests live in ingester_tests.rs to keep this file under 500 NLOC.
341// The sibling test file `ingester_tests.rs` is registered in `streaming/mod.rs`.