velesdb_core/collection/streaming/ingester.rs
1//! StreamIngester: bounded-channel ingestion with micro-batch drain.
2
3use crate::collection::types::Collection;
4use crate::point::Point;
5
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio::sync::Notify;
9
10/// Configuration for the streaming ingestion pipeline.
11///
12/// Controls channel capacity, micro-batch sizing, and flush timing.
13///
14/// # Defaults
15///
16/// | Parameter | Default |
17/// |--------------------|----------|
18/// | `buffer_size` | 10 000 |
19/// | `batch_size` | 128 |
20/// | `flush_interval_ms`| 50 |
21///
22/// Future: persist StreamingConfig in CollectionConfig (STREAM-04)
23///
24/// `StreamingConfig` is currently runtime-only. A future pass should
25/// serialize it into `CollectionConfig` so the pipeline is automatically
26/// restored on `Collection::open`.
27#[derive(Debug, Clone)]
28pub struct StreamingConfig {
29 /// Capacity of the bounded mpsc channel (backpressure threshold).
30 pub buffer_size: usize,
31
32 /// Number of points that trigger an immediate micro-batch flush.
33 pub batch_size: usize,
34
35 /// Maximum time (ms) before a partial batch is flushed.
36 pub flush_interval_ms: u64,
37}
38
39impl Default for StreamingConfig {
40 fn default() -> Self {
41 Self {
42 buffer_size: 10_000,
43 batch_size: 128,
44 flush_interval_ms: 50,
45 }
46 }
47}
48
49/// Internal write mode discriminator (not exposed to users).
50///
51/// Distinguishes between API-driven writes (synchronous upsert) and
52/// streaming-driven writes (micro-batch drain).
53///
54/// Future: integrate WriteMode into StreamingConfig (STREAM-03)
55///
56/// `WriteMode` is currently unused. Once streaming-specific write paths
57/// (e.g., bypass WAL for low-latency inserts) are implemented, wire this
58/// into the flush pipeline.
59#[allow(dead_code)] // Tracked: STREAM-03
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub(crate) enum WriteMode {
62 /// Standard synchronous API upsert.
63 Api,
64 /// Streaming micro-batch drain.
65 Streaming,
66}
67
68/// Error returned when the streaming channel cannot accept a point.
69#[derive(Debug, thiserror::Error)]
70pub enum BackpressureError {
71 /// The ingestion buffer is full; the caller should retry after a short delay.
72 #[error("streaming buffer is full (backpressure)")]
73 BufferFull,
74
75 /// Streaming is not configured on this collection.
76 #[error("streaming is not configured on this collection")]
77 NotConfigured,
78
79 /// The drain task has exited; the streaming pipeline is no longer functional.
80 ///
81 /// This is a fatal condition — the server should respond 503 Service Unavailable
82 /// and the collection may need to be reconfigured.
83 #[error("streaming drain task has exited; the ingestion pipeline is dead")]
84 DrainTaskDead,
85}
86
87/// Streaming ingestion handle for a single collection.
88///
89/// Owns a bounded mpsc sender and a background drain task. Points sent via
90/// [`try_send`](Self::try_send) are accumulated into micro-batches and flushed
91/// to the collection's existing `upsert` pipeline.
92///
93/// # Shutdown
94///
95/// Call [`shutdown`](Self::shutdown) to gracefully drain remaining points.
96/// If dropped without shutdown, the drain task is aborted (points in the
97/// channel may be lost).
98pub struct StreamIngester {
99 sender: mpsc::Sender<Point>,
100 config: StreamingConfig,
101 drain_handle: Option<tokio::task::JoinHandle<()>>,
102 shutdown: Arc<Notify>,
103}
104
105impl StreamIngester {
106 /// Creates a new streaming ingester for the given collection.
107 ///
108 /// Spawns a background drain task that accumulates points and flushes
109 /// micro-batches via `Collection::upsert`.
110 #[must_use]
111 pub fn new(collection: Collection, config: StreamingConfig) -> Self {
112 let (tx, rx) = mpsc::channel(config.buffer_size);
113 let shutdown = Arc::new(Notify::new());
114
115 let drain_handle = tokio::spawn(drain_loop(
116 collection,
117 rx,
118 config.batch_size,
119 config.flush_interval_ms,
120 Arc::clone(&shutdown),
121 ));
122
123 Self {
124 sender: tx,
125 config,
126 drain_handle: Some(drain_handle),
127 shutdown,
128 }
129 }
130
131 /// Attempts to send a point into the streaming channel.
132 ///
133 /// Returns immediately. If the channel is at capacity, returns
134 /// [`BackpressureError::BufferFull`]. If the drain task has exited
135 /// (channel closed), returns [`BackpressureError::DrainTaskDead`].
136 ///
137 /// # Errors
138 ///
139 /// - [`BackpressureError::BufferFull`] — the bounded channel is at capacity.
140 /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
141 pub fn try_send(&self, point: Point) -> Result<(), BackpressureError> {
142 self.sender.try_send(point).map_err(|e| match e {
143 mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
144 mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
145 })
146 }
147
148 /// Attempts to send a batch of points into the streaming channel.
149 ///
150 /// Sends points one by one through the bounded channel. If the channel
151 /// fills up mid-batch, returns [`BackpressureError::BufferFull`] — the
152 /// points already sent before the full condition are still queued and
153 /// will be drained normally. If the drain task has exited, returns
154 /// [`BackpressureError::DrainTaskDead`] on the first failed send.
155 ///
156 /// Returns the number of points successfully queued on success (all points).
157 ///
158 /// # Errors
159 ///
160 /// - [`BackpressureError::BufferFull`] — the bounded channel filled mid-batch.
161 /// - [`BackpressureError::DrainTaskDead`] — the drain task exited unexpectedly.
162 pub fn try_send_batch(
163 &self,
164 points: Vec<crate::point::Point>,
165 ) -> Result<usize, BackpressureError> {
166 let count = points.len();
167 for point in points {
168 self.sender.try_send(point).map_err(|e| match e {
169 mpsc::error::TrySendError::Full(_) => BackpressureError::BufferFull,
170 mpsc::error::TrySendError::Closed(_) => BackpressureError::DrainTaskDead,
171 })?;
172 }
173 Ok(count)
174 }
175
176 /// Returns a reference to the streaming configuration.
177 #[must_use]
178 pub fn config(&self) -> &StreamingConfig {
179 &self.config
180 }
181
182 /// Gracefully shuts down the ingester, flushing any remaining buffered points.
183 ///
184 /// This notifies the drain loop to exit and awaits its completion.
185 pub async fn shutdown(mut self) {
186 self.shutdown.notify_one();
187 if let Some(handle) = self.drain_handle.take() {
188 // Ignore JoinError — the drain loop should not panic.
189 let _ = handle.await;
190 }
191 }
192}
193
194impl Drop for StreamIngester {
195 fn drop(&mut self) {
196 // Abort the drain task to prevent orphaned background tasks.
197 // For graceful shutdown with flush, call `shutdown()` before dropping.
198 if let Some(handle) = self.drain_handle.take() {
199 handle.abort();
200 }
201 }
202}
203
204/// Background drain loop that accumulates points and flushes micro-batches.
205///
206/// Uses `tokio::select!` with three branches:
207/// 1. Shutdown notification — flush remaining batch and exit.
208/// 2. Timer tick — flush partial batch if non-empty.
209/// 3. Channel receive — push to batch; flush when `batch_size` reached.
210async fn drain_loop(
211 collection: Collection,
212 mut rx: mpsc::Receiver<Point>,
213 batch_size: usize,
214 flush_interval_ms: u64,
215 shutdown: Arc<Notify>,
216) {
217 let mut batch: Vec<Point> = Vec::with_capacity(batch_size);
218 let mut interval = tokio::time::interval(std::time::Duration::from_millis(flush_interval_ms));
219 // The first tick completes immediately; consume it.
220 interval.tick().await;
221
222 loop {
223 tokio::select! {
224 // Branch 1: shutdown signal — drain remaining channel items in
225 // micro-batches (M-1: flush at batch_size to bound memory usage).
226 () = shutdown.notified() => {
227 while let Ok(point) = rx.try_recv() {
228 batch.push(point);
229 // Flush at batch_size boundaries to avoid unbounded accumulation.
230 if batch.len() >= batch_size {
231 flush_batch(&collection, &mut batch).await;
232 }
233 }
234 if !batch.is_empty() {
235 flush_batch(&collection, &mut batch).await;
236 }
237 break;
238 }
239
240 // Branch 2: timer tick — flush partial batch
241 _ = interval.tick() => {
242 if !batch.is_empty() {
243 flush_batch(&collection, &mut batch).await;
244 }
245 }
246
247 // Branch 3: receive point from channel
248 msg = rx.recv() => {
249 if let Some(point) = msg {
250 batch.push(point);
251 if batch.len() >= batch_size {
252 flush_batch(&collection, &mut batch).await;
253 // Reset the interval so the timer doesn't fire
254 // immediately after a batch-size flush.
255 interval.reset();
256 }
257 } else {
258 // Channel closed (all senders dropped).
259 if !batch.is_empty() {
260 flush_batch(&collection, &mut batch).await;
261 }
262 break;
263 }
264 }
265 }
266 }
267}
268
269/// Flushes the accumulated batch via the collection's existing upsert pipeline.
270///
271/// Runs the blocking upsert on Tokio's blocking thread pool to avoid stalling
272/// the async runtime. If the delta buffer is active (HNSW rebuild in progress),
273/// also pushes the batch vectors into the delta buffer for immediate searchability.
274async fn flush_batch(collection: &Collection, batch: &mut Vec<Point>) {
275 let points: Vec<Point> = std::mem::take(batch);
276
277 // Snapshot vectors for delta buffer before moving points into upsert.
278 // Only allocate if delta is active (common case: delta is inactive).
279 let delta_entries: Vec<(u64, Vec<f32>)> = if collection.delta_buffer.is_active() {
280 points.iter().map(|p| (p.id, p.vector.clone())).collect()
281 } else {
282 Vec::new()
283 };
284
285 let coll = collection.clone();
286 // spawn_blocking wraps the synchronous upsert call (which acquires
287 // multiple RwLocks and does mmap I/O) to prevent blocking the async runtime.
288 let result = tokio::task::spawn_blocking(move || coll.upsert(points)).await;
289 match result {
290 Ok(Ok(())) => {
291 // After successful upsert, push to delta buffer if active.
292 // The upsert wrote to storage+WAL; delta is an additional runtime
293 // copy so search can find these vectors before HNSW is rebuilt.
294 if !delta_entries.is_empty() {
295 collection.delta_buffer.extend(delta_entries);
296 }
297 }
298 Ok(Err(e)) => {
299 tracing::error!("Streaming drain flush failed: {e}");
300 }
301 Err(e) => {
302 tracing::error!("Streaming drain task panicked: {e}");
303 }
304 }
305}
306
307// All ingester tests live in ingester_tests.rs to keep this file under 500 NLOC.
308// The sibling test file `ingester_tests.rs` is registered in `streaming/mod.rs`.