velesdb_server/handlers/points/
streaming.rs1use axum::{
4 body::Body,
5 extract::{Path, State},
6 http::StatusCode,
7 response::IntoResponse,
8 Json,
9};
10use futures::StreamExt;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use crate::types::{EnableStreamingRequest, ErrorResponse, StreamInsertRequest};
15use crate::AppState;
16use velesdb_core::{BackpressureError, Point, VectorCollection};
17
18use crate::handlers::helpers::{error_response, get_vector_collection_or_404};
19
20const STREAM_BATCH_SIZE: usize = 100;
21const STREAM_BATCH_MAX_WAIT: Duration = Duration::from_millis(100);
22
23#[derive(Default)]
25struct StreamUpsertStats {
26 inserted: usize,
27 malformed: usize,
28 failed_upserts: usize,
29 network_errors: u64,
35}
36
37fn parse_ndjson_line(
38 line: &str,
39 batch: &mut Vec<Point>,
40 stats: &mut StreamUpsertStats,
41 point_id_hint: Option<u64>,
42) {
43 if line.is_empty() {
44 return;
45 }
46
47 match serde_json::from_str::<Point>(line) {
48 Ok(point) => batch.push(point),
49 Err(error) => {
50 stats.malformed += 1;
51 if let Some(id) = point_id_hint {
53 tracing::warn!(
54 error = %error,
55 point_id = id,
56 "Skipping malformed NDJSON point"
57 );
58 } else {
59 tracing::warn!(error = %error, "Skipping malformed NDJSON point");
60 }
61 }
62 }
63}
64
65async fn flush_point_batch_with_delta(
68 collection: &VectorCollection,
69 batch: &mut Vec<Point>,
70 stats: &mut StreamUpsertStats,
71) {
72 if batch.is_empty() {
73 return;
74 }
75
76 let points = std::mem::take(batch);
77 let batch_size = points.len();
78
79 #[cfg(feature = "persistence")]
82 let delta_entries: Vec<(u64, Vec<f32>)> = if collection.is_delta_active() {
83 points.iter().map(|p| (p.id, p.vector.clone())).collect()
84 } else {
85 Vec::new()
86 };
87
88 let coll = collection.clone();
89 match tokio::task::spawn_blocking(move || coll.upsert_bulk(&points)).await {
90 Ok(Ok(inserted)) => {
91 stats.inserted += inserted;
92
93 #[cfg(feature = "persistence")]
96 if !delta_entries.is_empty() {
97 collection.push_to_delta_if_active(&delta_entries);
98 }
99 }
100 Ok(Err(error)) => {
101 stats.failed_upserts += batch_size;
102 tracing::error!(
103 error = %error,
104 batch_size,
105 "Failed to upsert streamed batch"
106 );
107 }
108 Err(error) => {
109 stats.failed_upserts += batch_size;
110 tracing::error!(
111 error = %error,
112 batch_size,
113 "Stream upsert batch task panicked"
114 );
115 }
116 }
117}
118
119#[utoipa::path(
128 post,
129 path = "/collections/{name}/points/stream",
130 tag = "points",
131 params(
132 ("name" = String, Path, description = "Collection name")
133 ),
134 request_body(content = String, content_type = "application/x-ndjson", description = "NDJSON stream with one point per line"),
135 responses(
136 (status = 200, description = "Stream processed", body = Object),
137 (status = 404, description = "Collection not found", body = ErrorResponse)
138 )
139)]
140pub async fn stream_upsert_points(
141 State(state): State<Arc<AppState>>,
142 Path(name): Path<String>,
143 body: Body,
144) -> impl IntoResponse {
145 let collection = match get_vector_collection_or_404(&state, &name) {
146 Ok(c) => c,
147 Err(resp) => return resp,
148 };
149
150 let stats = process_ndjson_stream(&collection, body).await;
151
152 if stats.inserted > 0 {
153 state.db.notify_upsert(&name, stats.inserted);
154 }
155
156 Json(serde_json::json!({
157 "message": "Stream processed",
158 "inserted": stats.inserted,
159 "malformed": stats.malformed,
160 "failed_upserts": stats.failed_upserts,
161 "network_errors": stats.network_errors
162 }))
163 .into_response()
164}
165
166fn extract_id_hint(line: &str) -> Option<u64> {
168 serde_json::from_str::<serde_json::Value>(line)
169 .ok()
170 .and_then(|v| v.get("id").and_then(|id| id.as_u64()))
171}
172
173async fn process_ndjson_stream(collection: &VectorCollection, body: Body) -> StreamUpsertStats {
175 let mut stream = body.into_data_stream();
176 let mut buffer = Vec::<u8>::new();
177 let mut batch = Vec::with_capacity(STREAM_BATCH_SIZE);
178 let mut stats = StreamUpsertStats::default();
179 let mut last_flush = Instant::now();
180
181 while let Some(chunk_result) = stream.next().await {
182 match chunk_result {
183 Ok(chunk) => {
184 buffer.extend_from_slice(&chunk);
185 process_buffer_lines(&mut buffer, &mut batch, &mut stats);
186 if should_flush(&batch, last_flush) {
187 flush_point_batch_with_delta(collection, &mut batch, &mut stats).await;
188 last_flush = Instant::now();
189 }
190 }
191 Err(error) => {
192 stats.network_errors += 1;
193 tracing::warn!(error = %error, "Error while reading request body stream");
194 }
195 }
196 }
197
198 if !buffer.is_empty() {
200 let line = String::from_utf8_lossy(&buffer);
201 let id_hint = extract_id_hint(line.trim());
202 parse_ndjson_line(line.trim(), &mut batch, &mut stats, id_hint);
203 }
204
205 flush_point_batch_with_delta(collection, &mut batch, &mut stats).await;
206 stats
207}
208
209fn process_buffer_lines(
211 buffer: &mut Vec<u8>,
212 batch: &mut Vec<Point>,
213 stats: &mut StreamUpsertStats,
214) {
215 while let Some(newline_pos) = buffer.iter().position(|byte| *byte == b'\n') {
216 let line_bytes: Vec<u8> = buffer.drain(..=newline_pos).collect();
217 let line = String::from_utf8_lossy(&line_bytes);
218 let id_hint = extract_id_hint(line.trim());
219 parse_ndjson_line(line.trim(), batch, stats, id_hint);
220 }
221}
222
223fn should_flush(batch: &[Point], last_flush: Instant) -> bool {
225 batch.len() >= STREAM_BATCH_SIZE
226 || (!batch.is_empty() && last_flush.elapsed() >= STREAM_BATCH_MAX_WAIT)
227}
228
229#[utoipa::path(
238 post,
239 path = "/collections/{name}/stream/insert",
240 tag = "points",
241 params(
242 ("name" = String, Path, description = "Collection name")
243 ),
244 request_body = StreamInsertRequest,
245 responses(
246 (status = 202, description = "Point accepted into streaming buffer"),
247 (status = 429, description = "Streaming buffer full — retry after 1 second", body = ErrorResponse),
248 (status = 503, description = "Streaming drain task has exited — collection must be reconfigured", body = ErrorResponse),
249 (status = 404, description = "Collection not found", body = ErrorResponse),
250 (status = 409, description = "Streaming not configured", body = ErrorResponse)
251 )
252)]
253pub async fn stream_insert(
254 State(state): State<Arc<AppState>>,
255 Path(name): Path<String>,
256 Json(req): Json<StreamInsertRequest>,
257) -> impl IntoResponse {
258 let collection = match get_vector_collection_or_404(&state, &name) {
259 Ok(c) => c,
260 Err(resp) => return resp,
261 };
262
263 if let Err(resp) = validate_stream_dimension(&collection, &req) {
264 return resp;
265 }
266
267 let point = Point::new(req.id, req.vector, req.payload);
268 stream_insert_result_to_response(collection.stream_insert(point))
269}
270
271#[allow(clippy::result_large_err)]
273fn validate_stream_dimension(
274 collection: &VectorCollection,
275 req: &StreamInsertRequest,
276) -> Result<(), axum::response::Response> {
277 let expected_dim = collection.dimension();
278 if req.vector.len() != expected_dim {
279 return Err(error_response(
280 StatusCode::BAD_REQUEST,
281 format!(
282 "Vector dimension mismatch: collection expects {expected_dim}, got {}",
283 req.vector.len()
284 ),
285 ));
286 }
287 Ok(())
288}
289
290fn stream_insert_result_to_response(
292 result: Result<(), BackpressureError>,
293) -> axum::response::Response {
294 match result {
295 Ok(()) => StatusCode::ACCEPTED.into_response(),
296 Err(BackpressureError::BufferFull) => {
297 let mut headers = axum::http::HeaderMap::new();
298 headers.insert("Retry-After", axum::http::HeaderValue::from_static("1"));
299 (
300 StatusCode::TOO_MANY_REQUESTS,
301 headers,
302 Json(ErrorResponse {
303 error: "Stream buffer full, retry after 1s".to_string(),
304 code: None,
305 }),
306 )
307 .into_response()
308 }
309 Err(BackpressureError::DrainTaskDead) => error_response(
310 StatusCode::SERVICE_UNAVAILABLE,
311 "Streaming drain task has exited; the collection must be reconfigured".to_string(),
312 ),
313 Err(BackpressureError::NotConfigured) => error_response(
314 StatusCode::CONFLICT,
315 "Streaming not configured for this collection".to_string(),
316 ),
317 Err(e) => error_response(
318 StatusCode::INTERNAL_SERVER_ERROR,
319 format!("Unexpected streaming error: {e}"),
320 ),
321 }
322}
323
324#[utoipa::path(
335 post,
336 path = "/collections/{name}/stream/enable",
337 tag = "points",
338 params(
339 ("name" = String, Path, description = "Collection name")
340 ),
341 request_body = EnableStreamingRequest,
342 responses(
343 (status = 200, description = "Streaming enabled", body = Object),
344 (status = 404, description = "Collection not found", body = ErrorResponse)
345 )
346)]
347pub async fn enable_streaming(
348 State(state): State<Arc<AppState>>,
349 Path(name): Path<String>,
350 Json(req): Json<EnableStreamingRequest>,
351) -> impl IntoResponse {
352 let collection = match get_vector_collection_or_404(&state, &name) {
353 Ok(c) => c,
354 Err(resp) => return resp,
355 };
356
357 enable_streaming_on(&collection, &req);
358
359 Json(serde_json::json!({
360 "message": "Streaming enabled",
361 "collection": name,
362 }))
363 .into_response()
364}
365
366#[cfg(feature = "persistence")]
368fn enable_streaming_on(collection: &VectorCollection, req: &EnableStreamingRequest) {
369 collection.enable_streaming(req.to_config());
370}
371
372#[cfg(not(feature = "persistence"))]
374fn enable_streaming_on(_collection: &VectorCollection, _req: &EnableStreamingRequest) {}