1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use alopex_core::kv::{KVStore, KVTransaction};
6use alopex_core::types::TxnMode;
7use alopex_core::vector::hnsw::HnswIndex;
8use alopex_sql::storage::AsyncSqlTransaction;
9use axum::extract::Extension;
10use axum::response::Response;
11use axum::Json;
12use serde::{Deserialize, Serialize};
13
14use crate::error::{Result, ServerError};
15use crate::http::{error_response, json_response, RequestContext};
16use crate::server::ServerState;
17
18#[derive(Debug, Deserialize)]
19pub struct VectorSearchRequest {
20 pub table: String,
21 pub vector: Vec<f32>,
22 #[serde(default = "default_k")]
23 pub k: usize,
24 pub index: Option<String>,
25 pub column: Option<String>,
26}
27
28#[derive(Debug, Deserialize)]
29pub struct VectorUpsertRequest {
30 pub table: String,
31 pub id: u64,
32 pub vector: Vec<f32>,
33 pub column: Option<String>,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct VectorDeleteRequest {
38 pub table: String,
39 pub id: u64,
40 pub column: Option<String>,
41}
42
43#[derive(Debug, Deserialize)]
44pub struct VectorIndexCreateRequest {
45 pub name: String,
46 pub table: String,
47 pub column: String,
48 pub method: Option<String>,
49 #[serde(default)]
50 pub options: HashMap<String, String>,
51 #[serde(default)]
52 pub if_not_exists: bool,
53}
54
55#[derive(Debug, Deserialize)]
56pub struct VectorIndexUpdateRequest {
57 pub name: String,
58 pub table: String,
59 pub column: String,
60 pub method: Option<String>,
61 #[serde(default)]
62 pub options: HashMap<String, String>,
63}
64
65#[derive(Debug, Deserialize)]
66pub struct VectorIndexDeleteRequest {
67 pub name: String,
68 #[serde(default)]
69 pub if_exists: bool,
70}
71
72#[derive(Debug, Deserialize)]
73pub struct VectorIndexCompactRequest {
74 pub name: String,
75}
76
77#[derive(Debug, Serialize)]
78pub struct VectorSearchResult {
79 pub id: u64,
80 pub distance: f32,
81 pub row: Vec<alopex_sql::storage::SqlValue>,
82}
83
84#[derive(Debug, Serialize)]
85pub struct VectorSearchResponse {
86 pub results: Vec<VectorSearchResult>,
87}
88
89#[derive(Debug, Serialize)]
90pub struct VectorUpsertResponse {
91 pub success: bool,
92}
93
94#[derive(Debug, Serialize)]
95pub struct VectorDeleteResponse {
96 pub success: bool,
97}
98
99#[derive(Debug, Serialize)]
100pub struct VectorIndexResponse {
101 pub success: bool,
102}
103
104pub async fn search(
105 Extension(state): Extension<Arc<ServerState>>,
106 Extension(ctx): Extension<RequestContext>,
107 Json(request): Json<VectorSearchRequest>,
108) -> Response {
109 match search_impl(state.clone(), request).await {
110 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
111 Err(err) => error_response(err, &ctx),
112 }
113}
114
115pub async fn upsert(
116 Extension(state): Extension<Arc<ServerState>>,
117 Extension(ctx): Extension<RequestContext>,
118 Json(request): Json<VectorUpsertRequest>,
119) -> Response {
120 match upsert_impl(state.clone(), request).await {
121 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
122 Err(err) => error_response(err, &ctx),
123 }
124}
125
126pub async fn delete(
127 Extension(state): Extension<Arc<ServerState>>,
128 Extension(ctx): Extension<RequestContext>,
129 Json(request): Json<VectorDeleteRequest>,
130) -> Response {
131 match delete_impl(state.clone(), request).await {
132 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
133 Err(err) => error_response(err, &ctx),
134 }
135}
136
137pub async fn index_create(
138 Extension(state): Extension<Arc<ServerState>>,
139 Extension(ctx): Extension<RequestContext>,
140 Json(request): Json<VectorIndexCreateRequest>,
141) -> Response {
142 match index_create_impl(state.clone(), request).await {
143 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
144 Err(err) => error_response(err, &ctx),
145 }
146}
147
148pub async fn index_update(
149 Extension(state): Extension<Arc<ServerState>>,
150 Extension(ctx): Extension<RequestContext>,
151 Json(request): Json<VectorIndexUpdateRequest>,
152) -> Response {
153 match index_update_impl(state.clone(), request).await {
154 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
155 Err(err) => error_response(err, &ctx),
156 }
157}
158
159pub async fn index_delete(
160 Extension(state): Extension<Arc<ServerState>>,
161 Extension(ctx): Extension<RequestContext>,
162 Json(request): Json<VectorIndexDeleteRequest>,
163) -> Response {
164 match index_delete_impl(state.clone(), request).await {
165 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
166 Err(err) => error_response(err, &ctx),
167 }
168}
169
170pub async fn index_compact(
171 Extension(state): Extension<Arc<ServerState>>,
172 Extension(ctx): Extension<RequestContext>,
173 Json(request): Json<VectorIndexCompactRequest>,
174) -> Response {
175 match index_compact_impl(state.clone(), request).await {
176 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
177 Err(err) => error_response(err, &ctx),
178 }
179}
180
181pub(crate) async fn search_impl(
182 state: Arc<ServerState>,
183 request: VectorSearchRequest,
184) -> Result<VectorSearchResponse> {
185 let start = Instant::now();
186 let (table_meta, vector_col, metric) =
187 match resolve_vector_table(&state, &request.table, request.column.as_deref()) {
188 Ok(values) => values,
189 Err(err) => {
190 state.metrics.record_query(start.elapsed(), false);
191 return Err(err);
192 }
193 };
194 let vector_literal = format_vector_literal(&request.vector);
195 let score_expr = format!(
196 "vector_similarity({}, {}, '{}')",
197 quote_ident(&vector_col),
198 vector_literal,
199 metric_to_string(metric)
200 );
201 let order = match metric {
202 alopex_sql::ast::ddl::VectorMetric::L2 => "ASC",
203 _ => "DESC",
204 };
205 let sql = format!(
206 "SELECT *, {score_expr} AS score FROM {} ORDER BY {score_expr} {order} LIMIT {}",
207 quote_ident(&table_meta.name),
208 request.k
209 );
210
211 let mut txn = match state.begin_sql_txn().await {
212 Ok(txn) => txn,
213 Err(err) => {
214 state.metrics.record_query(start.elapsed(), false);
215 return Err(err);
216 }
217 };
218 let exec_result =
219 match tokio::time::timeout(state.config.query_timeout, txn.async_execute(&sql)).await {
220 Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
221 Err(_) => Err(ServerError::Timeout("query timeout".into())),
222 };
223 let exec_result = match exec_result {
224 Ok(result) => {
225 if let Err(err) = txn.async_rollback().await {
226 state.metrics.record_query(start.elapsed(), false);
227 return Err(ServerError::Sql(err.into()));
228 }
229 result
230 }
231 Err(err) => {
232 let _ = txn.async_rollback().await;
233 state.metrics.record_query(start.elapsed(), false);
234 return Err(err);
235 }
236 };
237
238 let results = match exec_result {
239 alopex_sql::executor::ExecutionResult::Query(query) => {
240 let pk_index = match primary_key_index(&table_meta) {
241 Some(index) => index,
242 None => {
243 state.metrics.record_query(start.elapsed(), false);
244 return Err(ServerError::BadRequest(
245 "table must have a primary key for vector search".into(),
246 ));
247 }
248 };
249 query
250 .rows
251 .into_iter()
252 .filter_map(|mut row| {
253 let score_value = row.pop();
254 let score = match score_value {
255 Some(alopex_sql::storage::SqlValue::Float(v)) => v,
256 Some(alopex_sql::storage::SqlValue::Double(v)) => v as f32,
257 _ => return None,
258 };
259 let id = match row.get(pk_index) {
260 Some(alopex_sql::storage::SqlValue::Integer(v)) => *v as u64,
261 Some(alopex_sql::storage::SqlValue::BigInt(v)) => *v as u64,
262 _ => return None,
263 };
264 Some(VectorSearchResult {
265 id,
266 distance: score,
267 row,
268 })
269 })
270 .collect()
271 }
272 other => {
273 state.metrics.record_query(start.elapsed(), false);
274 return Err(ServerError::BadRequest(format!(
275 "vector search returned non-query result: {other:?}"
276 )));
277 }
278 };
279
280 state.metrics.record_query(start.elapsed(), true);
281 Ok(VectorSearchResponse { results })
282}
283
284pub(crate) async fn upsert_impl(
285 state: Arc<ServerState>,
286 request: VectorUpsertRequest,
287) -> Result<VectorUpsertResponse> {
288 let start = Instant::now();
289 state.lifecycle_state.check_write_allowed()?;
290 let (table_meta, vector_col, _) =
291 match resolve_vector_table(&state, &request.table, request.column.as_deref()) {
292 Ok(values) => values,
293 Err(err) => {
294 state.metrics.record_query(start.elapsed(), false);
295 return Err(err);
296 }
297 };
298 let pk_index = match primary_key_index(&table_meta) {
299 Some(index) => index,
300 None => {
301 state.metrics.record_query(start.elapsed(), false);
302 return Err(ServerError::BadRequest(
303 "table must have a primary key for vector upsert".into(),
304 ));
305 }
306 };
307 let pk_name = match table_meta.columns.get(pk_index).map(|c| c.name.clone()) {
308 Some(name) => name,
309 None => {
310 state.metrics.record_query(start.elapsed(), false);
311 return Err(ServerError::BadRequest(
312 "primary key column not found".into(),
313 ));
314 }
315 };
316
317 let vector_literal = format_vector_literal(&request.vector);
318 let insert_sql = format!(
319 "INSERT INTO {} ({}, {}) VALUES ({}, {})",
320 quote_ident(&table_meta.name),
321 quote_ident(&pk_name),
322 quote_ident(&vector_col),
323 request.id,
324 vector_literal
325 );
326 let update_sql = format!(
327 "UPDATE {} SET {} = {} WHERE {} = {}",
328 quote_ident(&table_meta.name),
329 quote_ident(&vector_col),
330 vector_literal,
331 quote_ident(&pk_name),
332 request.id
333 );
334
335 let mut txn = match state.begin_sql_txn().await {
336 Ok(txn) => txn,
337 Err(err) => {
338 state.metrics.record_query(start.elapsed(), false);
339 return Err(err);
340 }
341 };
342 let exec_result = match tokio::time::timeout(
343 state.config.query_timeout,
344 txn.async_execute(&insert_sql),
345 )
346 .await
347 {
348 Ok(result) => result,
349 Err(_) => {
350 let _ = txn.async_rollback().await;
351 state.metrics.record_query(start.elapsed(), false);
352 return Err(ServerError::Timeout("query timeout".into()));
353 }
354 };
355
356 let exec_result = match exec_result {
357 Ok(result) => Ok(result),
358 Err(err) => {
359 if is_unique_violation(&err) {
360 match tokio::time::timeout(
361 state.config.query_timeout,
362 txn.async_execute(&update_sql),
363 )
364 .await
365 {
366 Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
367 Err(_) => Err(ServerError::Timeout("query timeout".into())),
368 }
369 } else {
370 Err(ServerError::Sql(err.into()))
371 }
372 }
373 };
374
375 let exec_result = match exec_result {
376 Ok(result) => result,
377 Err(err) => {
378 let _ = txn.async_rollback().await;
379 state.metrics.record_query(start.elapsed(), false);
380 return Err(err);
381 }
382 };
383
384 if let Err(err) = txn.async_commit().await {
385 state.metrics.record_query(start.elapsed(), false);
386 return Err(ServerError::Sql(err.into()));
387 }
388
389 let response = match exec_result {
390 alopex_sql::executor::ExecutionResult::Success
391 | alopex_sql::executor::ExecutionResult::RowsAffected(_) => {
392 Ok(VectorUpsertResponse { success: true })
393 }
394 _ => {
395 state.metrics.record_query(start.elapsed(), false);
396 Err(ServerError::BadRequest(
397 "vector upsert returned unexpected result".into(),
398 ))
399 }
400 }?;
401
402 state.metrics.record_query(start.elapsed(), true);
403 Ok(response)
404}
405
406pub(crate) async fn delete_impl(
407 state: Arc<ServerState>,
408 request: VectorDeleteRequest,
409) -> Result<VectorDeleteResponse> {
410 let start = Instant::now();
411 state.lifecycle_state.check_write_allowed()?;
412 let (table_meta, _, _) =
413 resolve_vector_table(&state, &request.table, request.column.as_deref())?;
414 let pk_index = primary_key_index(&table_meta).ok_or_else(|| {
415 ServerError::BadRequest("table must have a primary key for vector delete".into())
416 })?;
417 let pk_name = table_meta
418 .columns
419 .get(pk_index)
420 .map(|c| c.name.clone())
421 .ok_or_else(|| ServerError::BadRequest("primary key column not found".into()))?;
422
423 let delete_sql = format!(
424 "DELETE FROM {} WHERE {} = {}",
425 quote_ident(&table_meta.name),
426 quote_ident(&pk_name),
427 request.id
428 );
429
430 let mut txn = state.begin_sql_txn().await?;
431 let exec_result = match tokio::time::timeout(
432 state.config.query_timeout,
433 txn.async_execute(&delete_sql),
434 )
435 .await
436 {
437 Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
438 Err(_) => Err(ServerError::Timeout("query timeout".into())),
439 };
440 let exec_result = match exec_result {
441 Ok(result) => result,
442 Err(err) => {
443 let _ = txn.async_rollback().await;
444 state.metrics.record_query(start.elapsed(), false);
445 return Err(err);
446 }
447 };
448 if let Err(err) = txn.async_commit().await {
449 state.metrics.record_query(start.elapsed(), false);
450 return Err(ServerError::Sql(err.into()));
451 }
452
453 let success = match exec_result {
454 alopex_sql::executor::ExecutionResult::RowsAffected(rows) => rows > 0,
455 alopex_sql::executor::ExecutionResult::Success => true,
456 _ => {
457 return Err(ServerError::BadRequest(
458 "vector delete returned unexpected result".into(),
459 ))
460 }
461 };
462
463 state.metrics.record_query(start.elapsed(), true);
464 Ok(VectorDeleteResponse { success })
465}
466
467pub(crate) async fn index_create_impl(
468 state: Arc<ServerState>,
469 request: VectorIndexCreateRequest,
470) -> Result<VectorIndexResponse> {
471 let start = Instant::now();
472 state.lifecycle_state.check_write_allowed()?;
473 let sql = match build_create_index_sql(
474 &request.name,
475 &request.table,
476 &request.column,
477 request.method.as_deref(),
478 &request.options,
479 request.if_not_exists,
480 ) {
481 Ok(sql) => sql,
482 Err(err) => {
483 state.metrics.record_query(start.elapsed(), false);
484 return Err(err);
485 }
486 };
487 let response = execute_index_sql(state, start, &sql).await?;
488 Ok(response)
489}
490
491pub(crate) async fn index_update_impl(
492 state: Arc<ServerState>,
493 request: VectorIndexUpdateRequest,
494) -> Result<VectorIndexResponse> {
495 let start = Instant::now();
496 state.lifecycle_state.check_write_allowed()?;
497 let create_sql = match build_create_index_sql(
498 &request.name,
499 &request.table,
500 &request.column,
501 request.method.as_deref(),
502 &request.options,
503 false,
504 ) {
505 Ok(sql) => sql,
506 Err(err) => {
507 state.metrics.record_query(start.elapsed(), false);
508 return Err(err);
509 }
510 };
511 let drop_sql = build_drop_index_sql(&request.name, true);
512 let sql = format!("{drop_sql}; {create_sql}");
513 let response = execute_index_sql(state, start, &sql).await?;
514 Ok(response)
515}
516
517pub(crate) async fn index_delete_impl(
518 state: Arc<ServerState>,
519 request: VectorIndexDeleteRequest,
520) -> Result<VectorIndexResponse> {
521 let start = Instant::now();
522 state.lifecycle_state.check_write_allowed()?;
523 let sql = build_drop_index_sql(&request.name, request.if_exists);
524 let response = execute_index_sql(state, start, &sql).await?;
525 Ok(response)
526}
527
528pub(crate) async fn index_compact_impl(
529 state: Arc<ServerState>,
530 request: VectorIndexCompactRequest,
531) -> Result<VectorIndexResponse> {
532 let start = Instant::now();
533 state.lifecycle_state.check_write_allowed()?;
534 let index = {
535 let guard = match state.catalog.read() {
536 Ok(guard) => guard,
537 Err(_) => {
538 state.metrics.record_query(start.elapsed(), false);
539 return Err(ServerError::Internal("catalog lock poisoned".into()));
540 }
541 };
542 match guard.get_index(&request.name).cloned() {
543 Some(index) => index,
544 None => {
545 state.metrics.record_query(start.elapsed(), false);
546 return Err(ServerError::NotFound("index not found".into()));
547 }
548 }
549 };
550 if !matches!(index.method, Some(alopex_sql::ast::ddl::IndexMethod::Hnsw)) {
551 state.metrics.record_query(start.elapsed(), false);
552 return Err(ServerError::BadRequest(
553 "index compact is only supported for HNSW".into(),
554 ));
555 }
556
557 let store = state.store.clone();
558 let name = request.name.clone();
559 let compacted = tokio::task::spawn_blocking(move || {
560 let mut txn = store.begin(TxnMode::ReadWrite)?;
561 let mut hnsw = HnswIndex::load(&name, &mut txn)?;
562 let result = hnsw.compact()?;
563 hnsw.save(&mut txn)?;
564 txn.commit_self()?;
565 Ok::<_, ServerError>(result)
566 })
567 .await
568 .map_err(|err| ServerError::Internal(err.to_string()));
569 let compacted = match compacted {
570 Ok(result) => result,
571 Err(err) => {
572 state.metrics.record_query(start.elapsed(), false);
573 return Err(err);
574 }
575 };
576
577 state.metrics.record_query(start.elapsed(), true);
578 let _ = compacted;
579 Ok(VectorIndexResponse { success: true })
580}
581
582async fn execute_index_sql(
583 state: Arc<ServerState>,
584 start: Instant,
585 sql: &str,
586) -> Result<VectorIndexResponse> {
587 let mut txn = state.begin_sql_txn().await?;
588 let exec_result =
589 match tokio::time::timeout(state.config.query_timeout, txn.async_execute(sql)).await {
590 Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
591 Err(_) => Err(ServerError::Timeout("query timeout".into())),
592 };
593 let exec_result = match exec_result {
594 Ok(result) => result,
595 Err(err) => {
596 let _ = txn.async_rollback().await;
597 state.metrics.record_query(start.elapsed(), false);
598 return Err(err);
599 }
600 };
601 if let Err(err) = txn.async_commit().await {
602 state.metrics.record_query(start.elapsed(), false);
603 return Err(ServerError::Sql(err.into()));
604 }
605
606 match exec_result {
607 alopex_sql::executor::ExecutionResult::Success
608 | alopex_sql::executor::ExecutionResult::RowsAffected(_) => {
609 state.metrics.record_query(start.elapsed(), true);
610 Ok(VectorIndexResponse { success: true })
611 }
612 _ => {
613 state.metrics.record_query(start.elapsed(), false);
614 Err(ServerError::BadRequest(
615 "vector index operation returned unexpected result".into(),
616 ))
617 }
618 }
619}
620
621fn build_create_index_sql(
622 name: &str,
623 table: &str,
624 column: &str,
625 method: Option<&str>,
626 options: &HashMap<String, String>,
627 if_not_exists: bool,
628) -> Result<String> {
629 if name.trim().is_empty() || table.trim().is_empty() || column.trim().is_empty() {
630 return Err(ServerError::BadRequest(
631 "index name, table, and column must not be empty".into(),
632 ));
633 }
634
635 let method_sql = match method.map(|m| m.to_ascii_lowercase()) {
636 None => String::new(),
637 Some(m) if m == "hnsw" => " USING HNSW".to_string(),
638 Some(m) if m == "btree" => " USING BTREE".to_string(),
639 Some(other) => {
640 return Err(ServerError::BadRequest(format!(
641 "unsupported index method: {other}"
642 )))
643 }
644 };
645 let options_sql = build_index_options_sql(options);
646 let if_not_exists = if if_not_exists { " IF NOT EXISTS" } else { "" };
647 Ok(format!(
648 "CREATE INDEX{if_not_exists} {} ON {} ({}){method_sql}{options_sql}",
649 quote_ident(name),
650 quote_ident(table),
651 quote_ident(column)
652 ))
653}
654
655fn build_drop_index_sql(name: &str, if_exists: bool) -> String {
656 let if_exists = if if_exists { " IF EXISTS" } else { "" };
657 format!("DROP INDEX{if_exists} {}", quote_ident(name))
658}
659
660fn build_index_options_sql(options: &HashMap<String, String>) -> String {
661 if options.is_empty() {
662 return String::new();
663 }
664 let mut items = Vec::with_capacity(options.len());
665 for (key, value) in options {
666 let quoted = value.replace('\'', "''");
667 items.push(format!("{}='{}'", quote_ident(key), quoted));
668 }
669 format!(" WITH ({})", items.join(", "))
670}
671
672fn resolve_vector_table(
673 state: &ServerState,
674 table: &str,
675 column: Option<&str>,
676) -> Result<(
677 alopex_sql::catalog::TableMetadata,
678 String,
679 alopex_sql::ast::ddl::VectorMetric,
680)> {
681 let catalog = state
682 .catalog
683 .read()
684 .map_err(|_| ServerError::Internal("catalog lock poisoned".into()))?;
685 let table_meta = catalog
686 .get_table(table)
687 .cloned()
688 .ok_or_else(|| ServerError::NotFound("table not found".into()))?;
689
690 let (vector_col, metric) = match column {
691 Some(name) => {
692 let col = table_meta
693 .columns
694 .iter()
695 .find(|c| c.name == name)
696 .ok_or_else(|| ServerError::NotFound("vector column not found".into()))?;
697 match &col.data_type {
698 alopex_sql::planner::ResolvedType::Vector { metric, .. } => {
699 (col.name.clone(), *metric)
700 }
701 _ => {
702 return Err(ServerError::BadRequest(
703 "specified column is not a vector".into(),
704 ))
705 }
706 }
707 }
708 None => {
709 let col = table_meta
710 .columns
711 .iter()
712 .find(|c| {
713 matches!(
714 c.data_type,
715 alopex_sql::planner::ResolvedType::Vector { .. }
716 )
717 })
718 .ok_or_else(|| ServerError::BadRequest("vector column not found".into()))?;
719 match &col.data_type {
720 alopex_sql::planner::ResolvedType::Vector { metric, .. } => {
721 (col.name.clone(), *metric)
722 }
723 _ => return Err(ServerError::BadRequest("vector column not found".into())),
724 }
725 }
726 };
727
728 Ok((table_meta, vector_col, metric))
729}
730
731fn primary_key_index(table: &alopex_sql::catalog::TableMetadata) -> Option<usize> {
732 if let Some(keys) = table.primary_key.as_ref() {
733 if let Some(primary) = keys.first() {
734 return table.columns.iter().position(|col| col.name == *primary);
735 }
736 }
737 table.columns.iter().position(|col| col.primary_key)
738}
739
740fn format_vector_literal(vector: &[f32]) -> String {
741 let body = vector
742 .iter()
743 .map(|v| format!("{v}"))
744 .collect::<Vec<_>>()
745 .join(", ");
746 format!("[{body}]")
747}
748
749fn metric_to_string(metric: alopex_sql::ast::ddl::VectorMetric) -> &'static str {
750 match metric {
751 alopex_sql::ast::ddl::VectorMetric::Cosine => "cosine",
752 alopex_sql::ast::ddl::VectorMetric::L2 => "l2",
753 alopex_sql::ast::ddl::VectorMetric::Inner => "inner",
754 }
755}
756
757fn quote_ident(ident: &str) -> String {
758 ident.to_string()
759}
760
761fn is_unique_violation(err: &alopex_sql::executor::ExecutorError) -> bool {
762 use alopex_sql::executor::ConstraintViolation;
763 use alopex_sql::executor::ExecutorError;
764
765 matches!(
766 err,
767 ExecutorError::ConstraintViolation(ConstraintViolation::PrimaryKey { .. })
768 | ExecutorError::ConstraintViolation(ConstraintViolation::Unique { .. })
769 )
770}
771
772fn default_k() -> usize {
773 10
774}