Skip to main content

velesdb_server/handlers/points/
streaming.rs

1//! NDJSON streaming upsert and bounded ingestion channel handlers.
2
3use axum::{
4    body::Body,
5    extract::{Path, State},
6    http::StatusCode,
7    response::IntoResponse,
8    Json,
9};
10use futures::StreamExt;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use crate::types::{EnableStreamingRequest, ErrorResponse, StreamInsertRequest};
15use crate::AppState;
16use velesdb_core::{BackpressureError, Point, VectorCollection};
17
18use crate::handlers::helpers::{error_response, get_vector_collection_or_404};
19
20const STREAM_BATCH_SIZE: usize = 100;
21const STREAM_BATCH_MAX_WAIT: Duration = Duration::from_millis(100);
22
23/// Accumulates statistics over an NDJSON stream upsert operation.
24#[derive(Default)]
25struct StreamUpsertStats {
26    inserted: usize,
27    malformed: usize,
28    failed_upserts: usize,
29    /// Number of HTTP/transport errors encountered while reading the request body.
30    ///
31    /// A non-zero value means the stream was truncated mid-transfer; the response
32    /// `inserted` count is therefore a lower bound on how many points were actually
33    /// sent by the client.
34    network_errors: u64,
35}
36
37fn parse_ndjson_line(
38    line: &str,
39    batch: &mut Vec<Point>,
40    stats: &mut StreamUpsertStats,
41    point_id_hint: Option<u64>,
42) {
43    if line.is_empty() {
44        return;
45    }
46
47    match serde_json::from_str::<Point>(line) {
48        Ok(point) => batch.push(point),
49        Err(error) => {
50            stats.malformed += 1;
51            // N-2: include point ID in the warning when it is available from context.
52            if let Some(id) = point_id_hint {
53                tracing::warn!(
54                    error = %error,
55                    point_id = id,
56                    "Skipping malformed NDJSON point"
57                );
58            } else {
59                tracing::warn!(error = %error, "Skipping malformed NDJSON point");
60            }
61        }
62    }
63}
64
65/// Flushes a batch and -- if the collection's delta buffer is active -- also
66/// pushes the entries into the buffer for immediate searchability.
67async fn flush_point_batch_with_delta(
68    collection: &VectorCollection,
69    batch: &mut Vec<Point>,
70    stats: &mut StreamUpsertStats,
71) {
72    if batch.is_empty() {
73        return;
74    }
75
76    let points = std::mem::take(batch);
77    let batch_size = points.len();
78
79    // Snapshot (id, vector) pairs for delta before moving `points` into spawn_blocking.
80    // Only allocate when delta is active to keep the hot path allocation-free.
81    #[cfg(feature = "persistence")]
82    let delta_entries: Vec<(u64, Vec<f32>)> = if collection.is_delta_active() {
83        points.iter().map(|p| (p.id, p.vector.clone())).collect()
84    } else {
85        Vec::new()
86    };
87
88    let coll = collection.clone();
89    match tokio::task::spawn_blocking(move || coll.upsert_bulk(&points)).await {
90        Ok(Ok(inserted)) => {
91            stats.inserted += inserted;
92
93            // C-3: push into the delta buffer after a successful upsert so that
94            // search can find these points before HNSW is rebuilt.
95            #[cfg(feature = "persistence")]
96            if !delta_entries.is_empty() {
97                collection.push_to_delta_if_active(&delta_entries);
98            }
99        }
100        Ok(Err(error)) => {
101            stats.failed_upserts += batch_size;
102            tracing::error!(
103                error = %error,
104                batch_size,
105                "Failed to upsert streamed batch"
106            );
107        }
108        Err(error) => {
109            stats.failed_upserts += batch_size;
110            tracing::error!(
111                error = %error,
112                batch_size,
113                "Stream upsert batch task panicked"
114            );
115        }
116    }
117}
118
119/// Stream upsert points using NDJSON.
120///
121/// Accepts a `application/x-ndjson` body. Each line is a JSON-encoded [`Point`].
122/// Points are accumulated into micro-batches and flushed via `upsert_bulk`.
123///
124/// The response body includes a `network_errors` field: a non-zero value means
125/// the HTTP body stream was truncated (e.g., client disconnect or proxy error),
126/// and the server may have received fewer points than the client sent (M-7).
127#[utoipa::path(
128    post,
129    path = "/collections/{name}/points/stream",
130    tag = "points",
131    params(
132        ("name" = String, Path, description = "Collection name")
133    ),
134    request_body(content = String, content_type = "application/x-ndjson", description = "NDJSON stream with one point per line"),
135    responses(
136        (status = 200, description = "Stream processed", body = Object),
137        (status = 404, description = "Collection not found", body = ErrorResponse)
138    )
139)]
140pub async fn stream_upsert_points(
141    State(state): State<Arc<AppState>>,
142    Path(name): Path<String>,
143    body: Body,
144) -> impl IntoResponse {
145    let collection = match get_vector_collection_or_404(&state, &name) {
146        Ok(c) => c,
147        Err(resp) => return resp,
148    };
149
150    let stats = process_ndjson_stream(&collection, body).await;
151
152    if stats.inserted > 0 {
153        state.db.notify_upsert(&name, stats.inserted);
154    }
155
156    Json(serde_json::json!({
157        "message": "Stream processed",
158        "inserted": stats.inserted,
159        "malformed": stats.malformed,
160        "failed_upserts": stats.failed_upserts,
161        "network_errors": stats.network_errors
162    }))
163    .into_response()
164}
165
166/// Pre-parse the `id` field from a JSON line for diagnostic logging.
167fn extract_id_hint(line: &str) -> Option<u64> {
168    serde_json::from_str::<serde_json::Value>(line)
169        .ok()
170        .and_then(|v| v.get("id").and_then(|id| id.as_u64()))
171}
172
173/// Read an NDJSON body stream, batching points and flushing periodically.
174async fn process_ndjson_stream(collection: &VectorCollection, body: Body) -> StreamUpsertStats {
175    let mut stream = body.into_data_stream();
176    let mut buffer = Vec::<u8>::new();
177    let mut batch = Vec::with_capacity(STREAM_BATCH_SIZE);
178    let mut stats = StreamUpsertStats::default();
179    let mut last_flush = Instant::now();
180
181    while let Some(chunk_result) = stream.next().await {
182        match chunk_result {
183            Ok(chunk) => {
184                buffer.extend_from_slice(&chunk);
185                process_buffer_lines(&mut buffer, &mut batch, &mut stats);
186                if should_flush(&batch, last_flush) {
187                    flush_point_batch_with_delta(collection, &mut batch, &mut stats).await;
188                    last_flush = Instant::now();
189                }
190            }
191            Err(error) => {
192                stats.network_errors += 1;
193                tracing::warn!(error = %error, "Error while reading request body stream");
194            }
195        }
196    }
197
198    // Drain any remaining incomplete line in the buffer.
199    if !buffer.is_empty() {
200        let line = String::from_utf8_lossy(&buffer);
201        let id_hint = extract_id_hint(line.trim());
202        parse_ndjson_line(line.trim(), &mut batch, &mut stats, id_hint);
203    }
204
205    flush_point_batch_with_delta(collection, &mut batch, &mut stats).await;
206    stats
207}
208
209/// Extract complete lines from the byte buffer and parse them as NDJSON points.
210fn process_buffer_lines(
211    buffer: &mut Vec<u8>,
212    batch: &mut Vec<Point>,
213    stats: &mut StreamUpsertStats,
214) {
215    while let Some(newline_pos) = buffer.iter().position(|byte| *byte == b'\n') {
216        let line_bytes: Vec<u8> = buffer.drain(..=newline_pos).collect();
217        let line = String::from_utf8_lossy(&line_bytes);
218        let id_hint = extract_id_hint(line.trim());
219        parse_ndjson_line(line.trim(), batch, stats, id_hint);
220    }
221}
222
223/// Check whether the current batch should be flushed.
224fn should_flush(batch: &[Point], last_flush: Instant) -> bool {
225    batch.len() >= STREAM_BATCH_SIZE
226        || (!batch.is_empty() && last_flush.elapsed() >= STREAM_BATCH_MAX_WAIT)
227}
228
229/// Stream-insert a single point via the bounded ingestion channel.
230///
231/// Returns 202 Accepted on success, 429 Too Many Requests when the buffer is
232/// full (with `Retry-After: 1` header per RFC 7231), 503 Service Unavailable
233/// when the drain task has exited, and 404 when the collection is not found.
234///
235/// This handler is `async` to satisfy Axum's handler contract; it does not
236/// perform any async I/O internally (the channel send is non-blocking).
237#[utoipa::path(
238    post,
239    path = "/collections/{name}/stream/insert",
240    tag = "points",
241    params(
242        ("name" = String, Path, description = "Collection name")
243    ),
244    request_body = StreamInsertRequest,
245    responses(
246        (status = 202, description = "Point accepted into streaming buffer"),
247        (status = 429, description = "Streaming buffer full — retry after 1 second", body = ErrorResponse),
248        (status = 503, description = "Streaming drain task has exited — collection must be reconfigured", body = ErrorResponse),
249        (status = 404, description = "Collection not found", body = ErrorResponse),
250        (status = 409, description = "Streaming not configured", body = ErrorResponse)
251    )
252)]
253pub async fn stream_insert(
254    State(state): State<Arc<AppState>>,
255    Path(name): Path<String>,
256    Json(req): Json<StreamInsertRequest>,
257) -> impl IntoResponse {
258    let collection = match get_vector_collection_or_404(&state, &name) {
259        Ok(c) => c,
260        Err(resp) => return resp,
261    };
262
263    if let Err(resp) = validate_stream_dimension(&collection, &req) {
264        return resp;
265    }
266
267    let point = Point::new(req.id, req.vector, req.payload);
268    stream_insert_result_to_response(collection.stream_insert(point))
269}
270
271/// Validate that the request vector dimension matches the collection.
272#[allow(clippy::result_large_err)]
273fn validate_stream_dimension(
274    collection: &VectorCollection,
275    req: &StreamInsertRequest,
276) -> Result<(), axum::response::Response> {
277    let expected_dim = collection.dimension();
278    if req.vector.len() != expected_dim {
279        return Err(error_response(
280            StatusCode::BAD_REQUEST,
281            format!(
282                "Vector dimension mismatch: collection expects {expected_dim}, got {}",
283                req.vector.len()
284            ),
285        ));
286    }
287    Ok(())
288}
289
290/// Convert a `stream_insert` result into an HTTP response.
291fn stream_insert_result_to_response(
292    result: Result<(), BackpressureError>,
293) -> axum::response::Response {
294    match result {
295        Ok(()) => StatusCode::ACCEPTED.into_response(),
296        Err(BackpressureError::BufferFull) => {
297            let mut headers = axum::http::HeaderMap::new();
298            headers.insert("Retry-After", axum::http::HeaderValue::from_static("1"));
299            (
300                StatusCode::TOO_MANY_REQUESTS,
301                headers,
302                Json(ErrorResponse {
303                    error: "Stream buffer full, retry after 1s".to_string(),
304                    code: None,
305                }),
306            )
307                .into_response()
308        }
309        Err(BackpressureError::DrainTaskDead) => error_response(
310            StatusCode::SERVICE_UNAVAILABLE,
311            "Streaming drain task has exited; the collection must be reconfigured".to_string(),
312        ),
313        Err(BackpressureError::NotConfigured) => error_response(
314            StatusCode::CONFLICT,
315            "Streaming not configured for this collection".to_string(),
316        ),
317        Err(e) => error_response(
318            StatusCode::INTERNAL_SERVER_ERROR,
319            format!("Unexpected streaming error: {e}"),
320        ),
321    }
322}
323
324/// Enable streaming ingestion on a collection.
325///
326/// Spawns the background drain task so subsequent calls to
327/// `/collections/{name}/stream/insert` accept points instead of returning a
328/// `409 Conflict` "streaming not configured" error. Omitted body fields fall
329/// back to the engine defaults. Calling this again replaces the existing
330/// ingester (the old drain task is aborted).
331///
332/// Returns `200 OK` on success and `404 Not Found` when the collection does
333/// not exist.
334#[utoipa::path(
335    post,
336    path = "/collections/{name}/stream/enable",
337    tag = "points",
338    params(
339        ("name" = String, Path, description = "Collection name")
340    ),
341    request_body = EnableStreamingRequest,
342    responses(
343        (status = 200, description = "Streaming enabled", body = Object),
344        (status = 404, description = "Collection not found", body = ErrorResponse)
345    )
346)]
347pub async fn enable_streaming(
348    State(state): State<Arc<AppState>>,
349    Path(name): Path<String>,
350    Json(req): Json<EnableStreamingRequest>,
351) -> impl IntoResponse {
352    let collection = match get_vector_collection_or_404(&state, &name) {
353        Ok(c) => c,
354        Err(resp) => return resp,
355    };
356
357    enable_streaming_on(&collection, &req);
358
359    Json(serde_json::json!({
360        "message": "Streaming enabled",
361        "collection": name,
362    }))
363    .into_response()
364}
365
366/// Enables streaming on the collection when persistence is compiled in.
367#[cfg(feature = "persistence")]
368fn enable_streaming_on(collection: &VectorCollection, req: &EnableStreamingRequest) {
369    collection.enable_streaming(req.to_config());
370}
371
372/// No-op when persistence is disabled (the streaming pipeline is unavailable).
373#[cfg(not(feature = "persistence"))]
374fn enable_streaming_on(_collection: &VectorCollection, _req: &EnableStreamingRequest) {}