alopex_server/http/
vector.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use alopex_core::kv::{KVStore, KVTransaction};
6use alopex_core::types::TxnMode;
7use alopex_core::vector::hnsw::HnswIndex;
8use alopex_sql::storage::AsyncSqlTransaction;
9use axum::extract::Extension;
10use axum::response::Response;
11use axum::Json;
12use serde::{Deserialize, Serialize};
13
14use crate::error::{Result, ServerError};
15use crate::http::{error_response, json_response, RequestContext};
16use crate::server::ServerState;
17
18#[derive(Debug, Deserialize)]
19pub struct VectorSearchRequest {
20    pub table: String,
21    pub vector: Vec<f32>,
22    #[serde(default = "default_k")]
23    pub k: usize,
24    pub index: Option<String>,
25    pub column: Option<String>,
26}
27
28#[derive(Debug, Deserialize)]
29pub struct VectorUpsertRequest {
30    pub table: String,
31    pub id: u64,
32    pub vector: Vec<f32>,
33    pub column: Option<String>,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct VectorDeleteRequest {
38    pub table: String,
39    pub id: u64,
40    pub column: Option<String>,
41}
42
43#[derive(Debug, Deserialize)]
44pub struct VectorIndexCreateRequest {
45    pub name: String,
46    pub table: String,
47    pub column: String,
48    pub method: Option<String>,
49    #[serde(default)]
50    pub options: HashMap<String, String>,
51    #[serde(default)]
52    pub if_not_exists: bool,
53}
54
55#[derive(Debug, Deserialize)]
56pub struct VectorIndexUpdateRequest {
57    pub name: String,
58    pub table: String,
59    pub column: String,
60    pub method: Option<String>,
61    #[serde(default)]
62    pub options: HashMap<String, String>,
63}
64
65#[derive(Debug, Deserialize)]
66pub struct VectorIndexDeleteRequest {
67    pub name: String,
68    #[serde(default)]
69    pub if_exists: bool,
70}
71
72#[derive(Debug, Deserialize)]
73pub struct VectorIndexCompactRequest {
74    pub name: String,
75}
76
77#[derive(Debug, Serialize)]
78pub struct VectorSearchResult {
79    pub id: u64,
80    pub distance: f32,
81    pub row: Vec<alopex_sql::storage::SqlValue>,
82}
83
84#[derive(Debug, Serialize)]
85pub struct VectorSearchResponse {
86    pub results: Vec<VectorSearchResult>,
87}
88
89#[derive(Debug, Serialize)]
90pub struct VectorUpsertResponse {
91    pub success: bool,
92}
93
94#[derive(Debug, Serialize)]
95pub struct VectorDeleteResponse {
96    pub success: bool,
97}
98
99#[derive(Debug, Serialize)]
100pub struct VectorIndexResponse {
101    pub success: bool,
102}
103
104pub async fn search(
105    Extension(state): Extension<Arc<ServerState>>,
106    Extension(ctx): Extension<RequestContext>,
107    Json(request): Json<VectorSearchRequest>,
108) -> Response {
109    match search_impl(state.clone(), request).await {
110        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
111        Err(err) => error_response(err, &ctx),
112    }
113}
114
115pub async fn upsert(
116    Extension(state): Extension<Arc<ServerState>>,
117    Extension(ctx): Extension<RequestContext>,
118    Json(request): Json<VectorUpsertRequest>,
119) -> Response {
120    match upsert_impl(state.clone(), request).await {
121        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
122        Err(err) => error_response(err, &ctx),
123    }
124}
125
126pub async fn delete(
127    Extension(state): Extension<Arc<ServerState>>,
128    Extension(ctx): Extension<RequestContext>,
129    Json(request): Json<VectorDeleteRequest>,
130) -> Response {
131    match delete_impl(state.clone(), request).await {
132        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
133        Err(err) => error_response(err, &ctx),
134    }
135}
136
137pub async fn index_create(
138    Extension(state): Extension<Arc<ServerState>>,
139    Extension(ctx): Extension<RequestContext>,
140    Json(request): Json<VectorIndexCreateRequest>,
141) -> Response {
142    match index_create_impl(state.clone(), request).await {
143        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
144        Err(err) => error_response(err, &ctx),
145    }
146}
147
148pub async fn index_update(
149    Extension(state): Extension<Arc<ServerState>>,
150    Extension(ctx): Extension<RequestContext>,
151    Json(request): Json<VectorIndexUpdateRequest>,
152) -> Response {
153    match index_update_impl(state.clone(), request).await {
154        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
155        Err(err) => error_response(err, &ctx),
156    }
157}
158
159pub async fn index_delete(
160    Extension(state): Extension<Arc<ServerState>>,
161    Extension(ctx): Extension<RequestContext>,
162    Json(request): Json<VectorIndexDeleteRequest>,
163) -> Response {
164    match index_delete_impl(state.clone(), request).await {
165        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
166        Err(err) => error_response(err, &ctx),
167    }
168}
169
170pub async fn index_compact(
171    Extension(state): Extension<Arc<ServerState>>,
172    Extension(ctx): Extension<RequestContext>,
173    Json(request): Json<VectorIndexCompactRequest>,
174) -> Response {
175    match index_compact_impl(state.clone(), request).await {
176        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
177        Err(err) => error_response(err, &ctx),
178    }
179}
180
181pub(crate) async fn search_impl(
182    state: Arc<ServerState>,
183    request: VectorSearchRequest,
184) -> Result<VectorSearchResponse> {
185    let start = Instant::now();
186    let (table_meta, vector_col, metric) =
187        match resolve_vector_table(&state, &request.table, request.column.as_deref()) {
188            Ok(values) => values,
189            Err(err) => {
190                state.metrics.record_query(start.elapsed(), false);
191                return Err(err);
192            }
193        };
194    let vector_literal = format_vector_literal(&request.vector);
195    let score_expr = format!(
196        "vector_similarity({}, {}, '{}')",
197        quote_ident(&vector_col),
198        vector_literal,
199        metric_to_string(metric)
200    );
201    let order = match metric {
202        alopex_sql::ast::ddl::VectorMetric::L2 => "ASC",
203        _ => "DESC",
204    };
205    let sql = format!(
206        "SELECT *, {score_expr} AS score FROM {} ORDER BY {score_expr} {order} LIMIT {}",
207        quote_ident(&table_meta.name),
208        request.k
209    );
210
211    let mut txn = match state.begin_sql_txn().await {
212        Ok(txn) => txn,
213        Err(err) => {
214            state.metrics.record_query(start.elapsed(), false);
215            return Err(err);
216        }
217    };
218    let exec_result =
219        match tokio::time::timeout(state.config.query_timeout, txn.async_execute(&sql)).await {
220            Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
221            Err(_) => Err(ServerError::Timeout("query timeout".into())),
222        };
223    let exec_result = match exec_result {
224        Ok(result) => {
225            if let Err(err) = txn.async_rollback().await {
226                state.metrics.record_query(start.elapsed(), false);
227                return Err(ServerError::Sql(err.into()));
228            }
229            result
230        }
231        Err(err) => {
232            let _ = txn.async_rollback().await;
233            state.metrics.record_query(start.elapsed(), false);
234            return Err(err);
235        }
236    };
237
238    let results = match exec_result {
239        alopex_sql::executor::ExecutionResult::Query(query) => {
240            let pk_index = match primary_key_index(&table_meta) {
241                Some(index) => index,
242                None => {
243                    state.metrics.record_query(start.elapsed(), false);
244                    return Err(ServerError::BadRequest(
245                        "table must have a primary key for vector search".into(),
246                    ));
247                }
248            };
249            query
250                .rows
251                .into_iter()
252                .filter_map(|mut row| {
253                    let score_value = row.pop();
254                    let score = match score_value {
255                        Some(alopex_sql::storage::SqlValue::Float(v)) => v,
256                        Some(alopex_sql::storage::SqlValue::Double(v)) => v as f32,
257                        _ => return None,
258                    };
259                    let id = match row.get(pk_index) {
260                        Some(alopex_sql::storage::SqlValue::Integer(v)) => *v as u64,
261                        Some(alopex_sql::storage::SqlValue::BigInt(v)) => *v as u64,
262                        _ => return None,
263                    };
264                    Some(VectorSearchResult {
265                        id,
266                        distance: score,
267                        row,
268                    })
269                })
270                .collect()
271        }
272        other => {
273            state.metrics.record_query(start.elapsed(), false);
274            return Err(ServerError::BadRequest(format!(
275                "vector search returned non-query result: {other:?}"
276            )));
277        }
278    };
279
280    state.metrics.record_query(start.elapsed(), true);
281    Ok(VectorSearchResponse { results })
282}
283
284pub(crate) async fn upsert_impl(
285    state: Arc<ServerState>,
286    request: VectorUpsertRequest,
287) -> Result<VectorUpsertResponse> {
288    let start = Instant::now();
289    let (table_meta, vector_col, _) =
290        match resolve_vector_table(&state, &request.table, request.column.as_deref()) {
291            Ok(values) => values,
292            Err(err) => {
293                state.metrics.record_query(start.elapsed(), false);
294                return Err(err);
295            }
296        };
297    let pk_index = match primary_key_index(&table_meta) {
298        Some(index) => index,
299        None => {
300            state.metrics.record_query(start.elapsed(), false);
301            return Err(ServerError::BadRequest(
302                "table must have a primary key for vector upsert".into(),
303            ));
304        }
305    };
306    let pk_name = match table_meta.columns.get(pk_index).map(|c| c.name.clone()) {
307        Some(name) => name,
308        None => {
309            state.metrics.record_query(start.elapsed(), false);
310            return Err(ServerError::BadRequest(
311                "primary key column not found".into(),
312            ));
313        }
314    };
315
316    let vector_literal = format_vector_literal(&request.vector);
317    let insert_sql = format!(
318        "INSERT INTO {} ({}, {}) VALUES ({}, {})",
319        quote_ident(&table_meta.name),
320        quote_ident(&pk_name),
321        quote_ident(&vector_col),
322        request.id,
323        vector_literal
324    );
325    let update_sql = format!(
326        "UPDATE {} SET {} = {} WHERE {} = {}",
327        quote_ident(&table_meta.name),
328        quote_ident(&vector_col),
329        vector_literal,
330        quote_ident(&pk_name),
331        request.id
332    );
333
334    let mut txn = match state.begin_sql_txn().await {
335        Ok(txn) => txn,
336        Err(err) => {
337            state.metrics.record_query(start.elapsed(), false);
338            return Err(err);
339        }
340    };
341    let exec_result = match tokio::time::timeout(
342        state.config.query_timeout,
343        txn.async_execute(&insert_sql),
344    )
345    .await
346    {
347        Ok(result) => result,
348        Err(_) => {
349            let _ = txn.async_rollback().await;
350            state.metrics.record_query(start.elapsed(), false);
351            return Err(ServerError::Timeout("query timeout".into()));
352        }
353    };
354
355    let exec_result = match exec_result {
356        Ok(result) => Ok(result),
357        Err(err) => {
358            if is_unique_violation(&err) {
359                match tokio::time::timeout(
360                    state.config.query_timeout,
361                    txn.async_execute(&update_sql),
362                )
363                .await
364                {
365                    Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
366                    Err(_) => Err(ServerError::Timeout("query timeout".into())),
367                }
368            } else {
369                Err(ServerError::Sql(err.into()))
370            }
371        }
372    };
373
374    let exec_result = match exec_result {
375        Ok(result) => result,
376        Err(err) => {
377            let _ = txn.async_rollback().await;
378            state.metrics.record_query(start.elapsed(), false);
379            return Err(err);
380        }
381    };
382
383    if let Err(err) = txn.async_commit().await {
384        state.metrics.record_query(start.elapsed(), false);
385        return Err(ServerError::Sql(err.into()));
386    }
387
388    let response = match exec_result {
389        alopex_sql::executor::ExecutionResult::Success
390        | alopex_sql::executor::ExecutionResult::RowsAffected(_) => {
391            Ok(VectorUpsertResponse { success: true })
392        }
393        _ => {
394            state.metrics.record_query(start.elapsed(), false);
395            Err(ServerError::BadRequest(
396                "vector upsert returned unexpected result".into(),
397            ))
398        }
399    }?;
400
401    state.metrics.record_query(start.elapsed(), true);
402    Ok(response)
403}
404
405pub(crate) async fn delete_impl(
406    state: Arc<ServerState>,
407    request: VectorDeleteRequest,
408) -> Result<VectorDeleteResponse> {
409    let start = Instant::now();
410    let (table_meta, _, _) =
411        resolve_vector_table(&state, &request.table, request.column.as_deref())?;
412    let pk_index = primary_key_index(&table_meta).ok_or_else(|| {
413        ServerError::BadRequest("table must have a primary key for vector delete".into())
414    })?;
415    let pk_name = table_meta
416        .columns
417        .get(pk_index)
418        .map(|c| c.name.clone())
419        .ok_or_else(|| ServerError::BadRequest("primary key column not found".into()))?;
420
421    let delete_sql = format!(
422        "DELETE FROM {} WHERE {} = {}",
423        quote_ident(&table_meta.name),
424        quote_ident(&pk_name),
425        request.id
426    );
427
428    let mut txn = state.begin_sql_txn().await?;
429    let exec_result = match tokio::time::timeout(
430        state.config.query_timeout,
431        txn.async_execute(&delete_sql),
432    )
433    .await
434    {
435        Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
436        Err(_) => Err(ServerError::Timeout("query timeout".into())),
437    };
438    let exec_result = match exec_result {
439        Ok(result) => result,
440        Err(err) => {
441            let _ = txn.async_rollback().await;
442            state.metrics.record_query(start.elapsed(), false);
443            return Err(err);
444        }
445    };
446    if let Err(err) = txn.async_commit().await {
447        state.metrics.record_query(start.elapsed(), false);
448        return Err(ServerError::Sql(err.into()));
449    }
450
451    let success = match exec_result {
452        alopex_sql::executor::ExecutionResult::RowsAffected(rows) => rows > 0,
453        alopex_sql::executor::ExecutionResult::Success => true,
454        _ => {
455            return Err(ServerError::BadRequest(
456                "vector delete returned unexpected result".into(),
457            ))
458        }
459    };
460
461    state.metrics.record_query(start.elapsed(), true);
462    Ok(VectorDeleteResponse { success })
463}
464
465pub(crate) async fn index_create_impl(
466    state: Arc<ServerState>,
467    request: VectorIndexCreateRequest,
468) -> Result<VectorIndexResponse> {
469    let start = Instant::now();
470    let sql = match build_create_index_sql(
471        &request.name,
472        &request.table,
473        &request.column,
474        request.method.as_deref(),
475        &request.options,
476        request.if_not_exists,
477    ) {
478        Ok(sql) => sql,
479        Err(err) => {
480            state.metrics.record_query(start.elapsed(), false);
481            return Err(err);
482        }
483    };
484    let response = execute_index_sql(state, start, &sql).await?;
485    Ok(response)
486}
487
488pub(crate) async fn index_update_impl(
489    state: Arc<ServerState>,
490    request: VectorIndexUpdateRequest,
491) -> Result<VectorIndexResponse> {
492    let start = Instant::now();
493    let create_sql = match build_create_index_sql(
494        &request.name,
495        &request.table,
496        &request.column,
497        request.method.as_deref(),
498        &request.options,
499        false,
500    ) {
501        Ok(sql) => sql,
502        Err(err) => {
503            state.metrics.record_query(start.elapsed(), false);
504            return Err(err);
505        }
506    };
507    let drop_sql = build_drop_index_sql(&request.name, true);
508    let sql = format!("{drop_sql}; {create_sql}");
509    let response = execute_index_sql(state, start, &sql).await?;
510    Ok(response)
511}
512
513pub(crate) async fn index_delete_impl(
514    state: Arc<ServerState>,
515    request: VectorIndexDeleteRequest,
516) -> Result<VectorIndexResponse> {
517    let start = Instant::now();
518    let sql = build_drop_index_sql(&request.name, request.if_exists);
519    let response = execute_index_sql(state, start, &sql).await?;
520    Ok(response)
521}
522
523pub(crate) async fn index_compact_impl(
524    state: Arc<ServerState>,
525    request: VectorIndexCompactRequest,
526) -> Result<VectorIndexResponse> {
527    let start = Instant::now();
528    let index = {
529        let guard = match state.catalog.read() {
530            Ok(guard) => guard,
531            Err(_) => {
532                state.metrics.record_query(start.elapsed(), false);
533                return Err(ServerError::Internal("catalog lock poisoned".into()));
534            }
535        };
536        match guard.get_index(&request.name).cloned() {
537            Some(index) => index,
538            None => {
539                state.metrics.record_query(start.elapsed(), false);
540                return Err(ServerError::NotFound("index not found".into()));
541            }
542        }
543    };
544    if !matches!(index.method, Some(alopex_sql::ast::ddl::IndexMethod::Hnsw)) {
545        state.metrics.record_query(start.elapsed(), false);
546        return Err(ServerError::BadRequest(
547            "index compact is only supported for HNSW".into(),
548        ));
549    }
550
551    let store = state.store.clone();
552    let name = request.name.clone();
553    let compacted = tokio::task::spawn_blocking(move || {
554        let mut txn = store.begin(TxnMode::ReadWrite)?;
555        let mut hnsw = HnswIndex::load(&name, &mut txn)?;
556        let result = hnsw.compact()?;
557        hnsw.save(&mut txn)?;
558        txn.commit_self()?;
559        Ok::<_, ServerError>(result)
560    })
561    .await
562    .map_err(|err| ServerError::Internal(err.to_string()));
563    let compacted = match compacted {
564        Ok(result) => result,
565        Err(err) => {
566            state.metrics.record_query(start.elapsed(), false);
567            return Err(err);
568        }
569    };
570
571    state.metrics.record_query(start.elapsed(), true);
572    let _ = compacted;
573    Ok(VectorIndexResponse { success: true })
574}
575
576async fn execute_index_sql(
577    state: Arc<ServerState>,
578    start: Instant,
579    sql: &str,
580) -> Result<VectorIndexResponse> {
581    let mut txn = state.begin_sql_txn().await?;
582    let exec_result =
583        match tokio::time::timeout(state.config.query_timeout, txn.async_execute(sql)).await {
584            Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
585            Err(_) => Err(ServerError::Timeout("query timeout".into())),
586        };
587    let exec_result = match exec_result {
588        Ok(result) => result,
589        Err(err) => {
590            let _ = txn.async_rollback().await;
591            state.metrics.record_query(start.elapsed(), false);
592            return Err(err);
593        }
594    };
595    if let Err(err) = txn.async_commit().await {
596        state.metrics.record_query(start.elapsed(), false);
597        return Err(ServerError::Sql(err.into()));
598    }
599
600    match exec_result {
601        alopex_sql::executor::ExecutionResult::Success
602        | alopex_sql::executor::ExecutionResult::RowsAffected(_) => {
603            state.metrics.record_query(start.elapsed(), true);
604            Ok(VectorIndexResponse { success: true })
605        }
606        _ => {
607            state.metrics.record_query(start.elapsed(), false);
608            Err(ServerError::BadRequest(
609                "vector index operation returned unexpected result".into(),
610            ))
611        }
612    }
613}
614
615fn build_create_index_sql(
616    name: &str,
617    table: &str,
618    column: &str,
619    method: Option<&str>,
620    options: &HashMap<String, String>,
621    if_not_exists: bool,
622) -> Result<String> {
623    if name.trim().is_empty() || table.trim().is_empty() || column.trim().is_empty() {
624        return Err(ServerError::BadRequest(
625            "index name, table, and column must not be empty".into(),
626        ));
627    }
628
629    let method_sql = match method.map(|m| m.to_ascii_lowercase()) {
630        None => String::new(),
631        Some(m) if m == "hnsw" => " USING HNSW".to_string(),
632        Some(m) if m == "btree" => " USING BTREE".to_string(),
633        Some(other) => {
634            return Err(ServerError::BadRequest(format!(
635                "unsupported index method: {other}"
636            )))
637        }
638    };
639    let options_sql = build_index_options_sql(options);
640    let if_not_exists = if if_not_exists { " IF NOT EXISTS" } else { "" };
641    Ok(format!(
642        "CREATE INDEX{if_not_exists} {} ON {} ({}){method_sql}{options_sql}",
643        quote_ident(name),
644        quote_ident(table),
645        quote_ident(column)
646    ))
647}
648
649fn build_drop_index_sql(name: &str, if_exists: bool) -> String {
650    let if_exists = if if_exists { " IF EXISTS" } else { "" };
651    format!("DROP INDEX{if_exists} {}", quote_ident(name))
652}
653
654fn build_index_options_sql(options: &HashMap<String, String>) -> String {
655    if options.is_empty() {
656        return String::new();
657    }
658    let mut items = Vec::with_capacity(options.len());
659    for (key, value) in options {
660        let quoted = value.replace('\'', "''");
661        items.push(format!("{}='{}'", quote_ident(key), quoted));
662    }
663    format!(" WITH ({})", items.join(", "))
664}
665
666fn resolve_vector_table(
667    state: &ServerState,
668    table: &str,
669    column: Option<&str>,
670) -> Result<(
671    alopex_sql::catalog::TableMetadata,
672    String,
673    alopex_sql::ast::ddl::VectorMetric,
674)> {
675    let catalog = state
676        .catalog
677        .read()
678        .map_err(|_| ServerError::Internal("catalog lock poisoned".into()))?;
679    let table_meta = catalog
680        .get_table(table)
681        .cloned()
682        .ok_or_else(|| ServerError::NotFound("table not found".into()))?;
683
684    let (vector_col, metric) = match column {
685        Some(name) => {
686            let col = table_meta
687                .columns
688                .iter()
689                .find(|c| c.name == name)
690                .ok_or_else(|| ServerError::NotFound("vector column not found".into()))?;
691            match &col.data_type {
692                alopex_sql::planner::ResolvedType::Vector { metric, .. } => {
693                    (col.name.clone(), *metric)
694                }
695                _ => {
696                    return Err(ServerError::BadRequest(
697                        "specified column is not a vector".into(),
698                    ))
699                }
700            }
701        }
702        None => {
703            let col = table_meta
704                .columns
705                .iter()
706                .find(|c| {
707                    matches!(
708                        c.data_type,
709                        alopex_sql::planner::ResolvedType::Vector { .. }
710                    )
711                })
712                .ok_or_else(|| ServerError::BadRequest("vector column not found".into()))?;
713            match &col.data_type {
714                alopex_sql::planner::ResolvedType::Vector { metric, .. } => {
715                    (col.name.clone(), *metric)
716                }
717                _ => return Err(ServerError::BadRequest("vector column not found".into())),
718            }
719        }
720    };
721
722    Ok((table_meta, vector_col, metric))
723}
724
725fn primary_key_index(table: &alopex_sql::catalog::TableMetadata) -> Option<usize> {
726    if let Some(keys) = table.primary_key.as_ref() {
727        if let Some(primary) = keys.first() {
728            return table.columns.iter().position(|col| col.name == *primary);
729        }
730    }
731    table.columns.iter().position(|col| col.primary_key)
732}
733
734fn format_vector_literal(vector: &[f32]) -> String {
735    let body = vector
736        .iter()
737        .map(|v| format!("{v}"))
738        .collect::<Vec<_>>()
739        .join(", ");
740    format!("[{body}]")
741}
742
743fn metric_to_string(metric: alopex_sql::ast::ddl::VectorMetric) -> &'static str {
744    match metric {
745        alopex_sql::ast::ddl::VectorMetric::Cosine => "cosine",
746        alopex_sql::ast::ddl::VectorMetric::L2 => "l2",
747        alopex_sql::ast::ddl::VectorMetric::Inner => "inner",
748    }
749}
750
751fn quote_ident(ident: &str) -> String {
752    ident.to_string()
753}
754
755fn is_unique_violation(err: &alopex_sql::executor::ExecutorError) -> bool {
756    use alopex_sql::executor::ConstraintViolation;
757    use alopex_sql::executor::ExecutorError;
758
759    matches!(
760        err,
761        ExecutorError::ConstraintViolation(ConstraintViolation::PrimaryKey { .. })
762            | ExecutorError::ConstraintViolation(ConstraintViolation::Unique { .. })
763    )
764}
765
766fn default_k() -> usize {
767    10
768}