Skip to main content

velesdb_core/collection/streaming/
ingester.rs

1//! StreamIngester: bounded-channel ingestion with micro-batch drain.
2
3use crate::collection::types::Collection;
4use crate::point::Point;
5
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::Notify;
10
11/// Configuration for the streaming ingestion pipeline.
12///
13/// Controls channel capacity, micro-batch sizing, and flush timing.
14///
15/// # Defaults
16///
17/// | Parameter          | Default  |
18/// |--------------------|----------|
19/// | `buffer_size`      | 10 000   |
20/// | `batch_size`       | 128      |
21/// | `flush_interval_ms`| 50       |
22///
23/// Persisted into `CollectionConfig` (schema v2+). The struct only describes
24/// the pipeline shape (sizes/timing); the live `StreamIngester` is still
25/// created on demand via `Collection::enable_streaming`. Each field carries a
26/// serde default matching [`Default`] so an older or partial `config.json`
27/// deserializes without error.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[non_exhaustive]
30pub struct StreamingConfig {
31    /// Capacity of the bounded mpsc channel (backpressure threshold).
32    #[serde(default = "default_buffer_size")]
33    pub buffer_size: usize,
34
35    /// Number of points that trigger an immediate micro-batch flush.
36    #[serde(default = "default_batch_size")]
37    pub batch_size: usize,
38
39    /// Maximum time (ms) before a partial batch is flushed.
40    #[serde(default = "default_flush_interval_ms")]
41    pub flush_interval_ms: u64,
42}
43
44fn default_buffer_size() -> usize {
45    10_000
46}
47
48fn default_batch_size() -> usize {
49    128
50}
51
52fn default_flush_interval_ms() -> u64 {
53    50
54}
55
56impl Default for StreamingConfig {
57    fn default() -> Self {
58        Self {
59            buffer_size: default_buffer_size(),
60            batch_size: default_batch_size(),
61            flush_interval_ms: default_flush_interval_ms(),
62        }
63    }
64}
65
66impl StreamingConfig {
67    /// Constructs a config from explicit pipeline parameters.
68    ///
69    /// Provided because the struct is `#[non_exhaustive]` (future fields use
70    /// serde defaults), so downstream crates cannot use a struct literal.
71    #[must_use]
72    pub fn new(buffer_size: usize, batch_size: usize, flush_interval_ms: u64) -> Self {
73        Self {
74            buffer_size,
75            batch_size,
76            flush_interval_ms,
77        }
78    }
79}
80
81/// Internal write mode discriminator (not exposed to users).
82///
83/// Distinguishes between API-driven writes (synchronous upsert) and
84/// streaming-driven writes (micro-batch drain).
85///
86/// Future: integrate WriteMode into StreamingConfig (STREAM-03)
87///
88/// `WriteMode` is currently unused. Once streaming-specific write paths
89/// (e.g., bypass WAL for low-latency inserts) are implemented, wire this
90/// into the flush pipeline.
91#[allow(dead_code)] // Tracked: STREAM-03
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub(crate) enum WriteMode {
94    /// Standard synchronous API upsert.
95    Api,
96    /// Streaming micro-batch drain.
97    Streaming,
98}
99
100/// Error returned when the streaming channel cannot accept a point.
101#[derive(Debug, thiserror::Error)]
102#[non_exhaustive]
103pub enum BackpressureError {
104    /// The ingestion buffer is full; the caller should retry after a short delay.
105    #[error("streaming buffer is full (backpressure)")]
106    BufferFull,
107
108    /// Streaming is not configured on this collection.
109    #[error("streaming is not configured on this collection")]
110    NotConfigured,
111
112    /// The drain task has exited; the streaming pipeline is no longer functional.
113    ///
114    /// This is a fatal condition — the server should respond 503 Service Unavailable
115    /// and the collection may need to be reconfigured.
116    #[error("streaming drain task has exited; the ingestion pipeline is dead")]
117    DrainTaskDead,
118}
119
120/// Streaming ingestion handle for a single collection.
121///
122/// Owns a bounded mpsc sender and a background drain task. Points sent via
123/// [`try_send`](Self::try_send) are accumulated into micro-batches and flushed
124/// to the collection's existing `upsert` pipeline.
125///
126/// # Shutdown
127///
128/// Call [`shutdown`](Self::shutdown) to gracefully drain remaining points.
129/// If dropped without shutdown, the drain task is aborted (points in the
130/// channel may be lost).
131pub struct StreamIngester {
132    sender: mpsc::Sender<Point>,
133    config: StreamingConfig,
134    drain_handle: Option<tokio::task::JoinHandle<()>>,
135    shutdown: Arc<Notify>,
136}
137
138impl StreamIngester {
139    /// Creates a new streaming ingester for the given collection.
140    ///
141    /// Spawns a background drain task that accumulates points and flushes
142    /// micro-batches via `Collection::upsert`.
143    #[must_use]
144    pub(crate) fn new(collection: Collection, config: StreamingConfig) -> Self {
145        let (tx, rx) = mpsc::channel(config.buffer_size);
146        let shutdown = Arc::new(Notify::new());
147
148        let drain_handle = tokio::spawn(drain_loop(
149            collection,
150            rx,
151            config.batch_size,
152            config.flush_interval_ms,
153            Arc::clone(&shutdown),
154        ));
155
156        Self {
157            sender: tx,
158            config,
159            drain_handle: Some(drain_handle),
160            shutdown,
161        }
162    }
163
164    /// Attempts to send a point into the streaming channel.
165    ///
166    /// Returns immediately. If the channel is at capacity, returns
167    /// [`BackpressureError::BufferFull`]. If the drain task has exited
168    /// (channel closed), returns [`BackpressureError::DrainTaskDead`].
169    ///
170    /// # Errors
171    ///
172    /// - [`BackpressureError::BufferFull`] — the bounded channel is at capacity.
173    /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
174    pub fn try_send(&self, point: Point) -> Result<(), BackpressureError> {
175        self.sender.try_send(point).map_err(|e| match e {
176            mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
177            mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
178        })
179    }
180
181    /// Attempts to send a batch of points into the streaming channel.
182    ///
183    /// Sends points one by one through the bounded channel. If the channel
184    /// fills up mid-batch, returns [`BackpressureError::BufferFull`] — the
185    /// points already sent before the full condition are still queued and
186    /// will be drained normally. If the drain task has exited, returns
187    /// [`BackpressureError::DrainTaskDead`] on the first failed send.
188    ///
189    /// Returns the number of points successfully queued on success (all points).
190    ///
191    /// # Errors
192    ///
193    /// - [`BackpressureError::BufferFull`] — the bounded channel filled mid-batch.
194    /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
195    pub fn try_send_batch(
196        &self,
197        points: Vec<crate::point::Point>,
198    ) -> Result<usize, BackpressureError> {
199        let count = points.len();
200        for point in points {
201            self.sender.try_send(point).map_err(|e| match e {
202                mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
203                mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
204            })?;
205        }
206        Ok(count)
207    }
208
209    /// Returns a reference to the streaming configuration.
210    #[must_use]
211    pub fn config(&self) -> &StreamingConfig {
212        &self.config
213    }
214
215    /// Gracefully shuts down the ingester, flushing any remaining buffered points.
216    ///
217    /// This notifies the drain loop to exit and awaits its completion.
218    pub async fn shutdown(mut self) {
219        self.shutdown.notify_one();
220        if let Some(handle) = self.drain_handle.take() {
221            // Ignore JoinError — the drain loop should not panic.
222            let _ = handle.await;
223        }
224    }
225}
226
227impl Drop for StreamIngester {
228    fn drop(&mut self) {
229        // Abort the drain task to prevent orphaned background tasks.
230        // For graceful shutdown with flush, call `shutdown()` before dropping.
231        if let Some(handle) = self.drain_handle.take() {
232            handle.abort();
233        }
234    }
235}
236
237/// Background drain loop that accumulates points and flushes micro-batches.
238///
239/// Uses `tokio::select!` with three branches:
240/// 1. Shutdown notification — flush remaining batch and exit.
241/// 2. Timer tick — flush partial batch if non-empty.
242/// 3. Channel receive — push to batch; flush when `batch_size` reached.
243// Reason: tokio::select! macro expansion inflates cognitive complexity beyond
244// what the actual logic warrants. Each branch delegates to a helper function.
245#[allow(clippy::cognitive_complexity)]
246async fn drain_loop(
247    collection: Collection,
248    mut rx: mpsc::Receiver<Point>,
249    batch_size: usize,
250    flush_interval_ms: u64,
251    shutdown: Arc<Notify>,
252) {
253    let mut batch: Vec<Point> = Vec::with_capacity(batch_size);
254    let mut interval = tokio::time::interval(std::time::Duration::from_millis(flush_interval_ms));
255    // The first tick completes immediately; consume it.
256    interval.tick().await;
257
258    loop {
259        tokio::select! {
260            // Branch 1: shutdown signal — drain remaining channel items in
261            // micro-batches (M-1: flush at batch_size to bound memory usage).
262            () = shutdown.notified() => {
263                drain_on_shutdown(&collection, &mut rx, &mut batch, batch_size).await;
264                break;
265            }
266
267            // Branch 2: timer tick — flush partial batch
268            _ = interval.tick() => {
269                flush_if_non_empty(&collection, &mut batch).await;
270            }
271
272            // Branch 3: receive point from channel
273            msg = rx.recv() => {
274                if !handle_received_point(&collection, &mut batch, batch_size, &mut interval, msg).await {
275                    break;
276                }
277            }
278        }
279    }
280}
281
282/// Drains remaining channel items in micro-batches during shutdown.
283///
284/// Flushes at `batch_size` boundaries to bound memory usage, then
285/// flushes any remaining partial batch.
286async fn drain_on_shutdown(
287    collection: &Collection,
288    rx: &mut mpsc::Receiver<Point>,
289    batch: &mut Vec<Point>,
290    batch_size: usize,
291) {
292    while let Ok(point) = rx.try_recv() {
293        batch.push(point);
294        if batch.len() >= batch_size {
295            flush_batch(collection, batch).await;
296        }
297    }
298    flush_if_non_empty(collection, batch).await;
299}
300
301/// Flushes the batch only when it contains at least one point.
302async fn flush_if_non_empty(collection: &Collection, batch: &mut Vec<Point>) {
303    if !batch.is_empty() {
304        flush_batch(collection, batch).await;
305    }
306}
307
308/// Handles a single received point (or channel-closed signal).
309///
310/// Returns `true` to continue the drain loop, `false` to break.
311async fn handle_received_point(
312    collection: &Collection,
313    batch: &mut Vec<Point>,
314    batch_size: usize,
315    interval: &mut tokio::time::Interval,
316    msg: Option<Point>,
317) -> bool {
318    if let Some(point) = msg {
319        batch.push(point);
320        if batch.len() >= batch_size {
321            flush_batch(collection, batch).await;
322            // Reset the interval so the timer doesn't fire
323            // immediately after a batch-size flush.
324            interval.reset();
325        }
326        true
327    } else {
328        // Channel closed (all senders dropped).
329        flush_if_non_empty(collection, batch).await;
330        false
331    }
332}
333
334/// Flushes the accumulated batch via the collection's existing upsert pipeline.
335///
336/// Runs the blocking upsert on Tokio's blocking thread pool to avoid stalling
337/// the async runtime. If the delta buffer is active (HNSW rebuild in progress),
338/// also pushes the batch vectors into the delta buffer for immediate searchability.
339async fn flush_batch(collection: &Collection, batch: &mut Vec<Point>) {
340    let points: Vec<Point> = std::mem::take(batch);
341
342    // Snapshot vectors for delta buffer before moving points into upsert.
343    // Only allocate if delta is active (common case: delta is inactive).
344    let delta_entries: Vec<(u64, Vec<f32>)> = if collection.delta_buffer.is_active() {
345        points.iter().map(|p| (p.id, p.vector.clone())).collect()
346    } else {
347        Vec::new()
348    };
349
350    let coll = collection.clone();
351    // spawn_blocking wraps the synchronous upsert call (which acquires
352    // multiple RwLocks and does mmap I/O) to prevent blocking the async runtime.
353    let result = tokio::task::spawn_blocking(move || coll.upsert(points)).await;
354    match result {
355        Ok(Ok(())) => {
356            // After successful upsert, push to delta buffer if active.
357            // The upsert wrote to storage+WAL; delta is an additional runtime
358            // copy so search can find these vectors before HNSW is rebuilt.
359            if !delta_entries.is_empty() {
360                collection.delta_buffer.extend(delta_entries);
361            }
362        }
363        Ok(Err(e)) => {
364            tracing::error!("Streaming drain flush failed: {e}");
365        }
366        Err(e) => {
367            tracing::error!("Streaming drain task panicked: {e}");
368        }
369    }
370}
371
372// All ingester tests live in ingester_tests.rs to keep this file under 500 NLOC.
373// The sibling test file `ingester_tests.rs` is registered in `streaming/mod.rs`.