Skip to main content

alopex_server/http/
vector.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use alopex_core::kv::{KVStore, KVTransaction};
6use alopex_core::types::TxnMode;
7use alopex_core::vector::hnsw::HnswIndex;
8use alopex_sql::storage::AsyncSqlTransaction;
9use axum::extract::Extension;
10use axum::response::Response;
11use axum::Json;
12use serde::{Deserialize, Serialize};
13
14use crate::error::{Result, ServerError};
15use crate::http::{error_response, json_response, RequestContext};
16use crate::server::ServerState;
17
18#[derive(Debug, Deserialize)]
19pub struct VectorSearchRequest {
20    pub table: String,
21    pub vector: Vec<f32>,
22    #[serde(default = "default_k")]
23    pub k: usize,
24    pub index: Option<String>,
25    pub column: Option<String>,
26}
27
28#[derive(Debug, Deserialize)]
29pub struct VectorUpsertRequest {
30    pub table: String,
31    pub id: u64,
32    pub vector: Vec<f32>,
33    pub column: Option<String>,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct VectorDeleteRequest {
38    pub table: String,
39    pub id: u64,
40    pub column: Option<String>,
41}
42
43#[derive(Debug, Deserialize)]
44pub struct VectorIndexCreateRequest {
45    pub name: String,
46    pub table: String,
47    pub column: String,
48    pub method: Option<String>,
49    #[serde(default)]
50    pub options: HashMap<String, String>,
51    #[serde(default)]
52    pub if_not_exists: bool,
53}
54
55#[derive(Debug, Deserialize)]
56pub struct VectorIndexUpdateRequest {
57    pub name: String,
58    pub table: String,
59    pub column: String,
60    pub method: Option<String>,
61    #[serde(default)]
62    pub options: HashMap<String, String>,
63}
64
65#[derive(Debug, Deserialize)]
66pub struct VectorIndexDeleteRequest {
67    pub name: String,
68    #[serde(default)]
69    pub if_exists: bool,
70}
71
72#[derive(Debug, Deserialize)]
73pub struct VectorIndexCompactRequest {
74    pub name: String,
75}
76
77#[derive(Debug, Serialize)]
78pub struct VectorSearchResult {
79    pub id: u64,
80    pub distance: f32,
81    pub row: Vec<alopex_sql::storage::SqlValue>,
82}
83
84#[derive(Debug, Serialize)]
85pub struct VectorSearchResponse {
86    pub results: Vec<VectorSearchResult>,
87}
88
89#[derive(Debug, Serialize)]
90pub struct VectorUpsertResponse {
91    pub success: bool,
92}
93
94#[derive(Debug, Serialize)]
95pub struct VectorDeleteResponse {
96    pub success: bool,
97}
98
99#[derive(Debug, Serialize)]
100pub struct VectorIndexResponse {
101    pub success: bool,
102}
103
104pub async fn search(
105    Extension(state): Extension<Arc<ServerState>>,
106    Extension(ctx): Extension<RequestContext>,
107    Json(request): Json<VectorSearchRequest>,
108) -> Response {
109    match search_impl(state.clone(), request).await {
110        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
111        Err(err) => error_response(err, &ctx),
112    }
113}
114
115pub async fn upsert(
116    Extension(state): Extension<Arc<ServerState>>,
117    Extension(ctx): Extension<RequestContext>,
118    Json(request): Json<VectorUpsertRequest>,
119) -> Response {
120    match upsert_impl(state.clone(), request).await {
121        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
122        Err(err) => error_response(err, &ctx),
123    }
124}
125
126pub async fn delete(
127    Extension(state): Extension<Arc<ServerState>>,
128    Extension(ctx): Extension<RequestContext>,
129    Json(request): Json<VectorDeleteRequest>,
130) -> Response {
131    match delete_impl(state.clone(), request).await {
132        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
133        Err(err) => error_response(err, &ctx),
134    }
135}
136
137pub async fn index_create(
138    Extension(state): Extension<Arc<ServerState>>,
139    Extension(ctx): Extension<RequestContext>,
140    Json(request): Json<VectorIndexCreateRequest>,
141) -> Response {
142    match index_create_impl(state.clone(), request).await {
143        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
144        Err(err) => error_response(err, &ctx),
145    }
146}
147
148pub async fn index_update(
149    Extension(state): Extension<Arc<ServerState>>,
150    Extension(ctx): Extension<RequestContext>,
151    Json(request): Json<VectorIndexUpdateRequest>,
152) -> Response {
153    match index_update_impl(state.clone(), request).await {
154        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
155        Err(err) => error_response(err, &ctx),
156    }
157}
158
159pub async fn index_delete(
160    Extension(state): Extension<Arc<ServerState>>,
161    Extension(ctx): Extension<RequestContext>,
162    Json(request): Json<VectorIndexDeleteRequest>,
163) -> Response {
164    match index_delete_impl(state.clone(), request).await {
165        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
166        Err(err) => error_response(err, &ctx),
167    }
168}
169
170pub async fn index_compact(
171    Extension(state): Extension<Arc<ServerState>>,
172    Extension(ctx): Extension<RequestContext>,
173    Json(request): Json<VectorIndexCompactRequest>,
174) -> Response {
175    match index_compact_impl(state.clone(), request).await {
176        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
177        Err(err) => error_response(err, &ctx),
178    }
179}
180
181pub(crate) async fn search_impl(
182    state: Arc<ServerState>,
183    request: VectorSearchRequest,
184) -> Result<VectorSearchResponse> {
185    let start = Instant::now();
186    let (table_meta, vector_col, metric) =
187        match resolve_vector_table(&state, &request.table, request.column.as_deref()) {
188            Ok(values) => values,
189            Err(err) => {
190                state.metrics.record_query(start.elapsed(), false);
191                return Err(err);
192            }
193        };
194    let vector_literal = format_vector_literal(&request.vector);
195    let score_expr = format!(
196        "vector_similarity({}, {}, '{}')",
197        quote_ident(&vector_col),
198        vector_literal,
199        metric_to_string(metric)
200    );
201    let order = match metric {
202        alopex_sql::ast::ddl::VectorMetric::L2 => "ASC",
203        _ => "DESC",
204    };
205    let sql = format!(
206        "SELECT *, {score_expr} AS score FROM {} ORDER BY {score_expr} {order} LIMIT {}",
207        quote_ident(&table_meta.name),
208        request.k
209    );
210
211    let mut txn = match state.begin_sql_txn().await {
212        Ok(txn) => txn,
213        Err(err) => {
214            state.metrics.record_query(start.elapsed(), false);
215            return Err(err);
216        }
217    };
218    let exec_result =
219        match tokio::time::timeout(state.config.query_timeout, txn.async_execute(&sql)).await {
220            Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
221            Err(_) => Err(ServerError::Timeout("query timeout".into())),
222        };
223    let exec_result = match exec_result {
224        Ok(result) => {
225            if let Err(err) = txn.async_rollback().await {
226                state.metrics.record_query(start.elapsed(), false);
227                return Err(ServerError::Sql(err.into()));
228            }
229            result
230        }
231        Err(err) => {
232            let _ = txn.async_rollback().await;
233            state.metrics.record_query(start.elapsed(), false);
234            return Err(err);
235        }
236    };
237
238    let results = match exec_result {
239        alopex_sql::executor::ExecutionResult::Query(query) => {
240            let pk_index = match primary_key_index(&table_meta) {
241                Some(index) => index,
242                None => {
243                    state.metrics.record_query(start.elapsed(), false);
244                    return Err(ServerError::BadRequest(
245                        "table must have a primary key for vector search".into(),
246                    ));
247                }
248            };
249            query
250                .rows
251                .into_iter()
252                .filter_map(|mut row| {
253                    let score_value = row.pop();
254                    let score = match score_value {
255                        Some(alopex_sql::storage::SqlValue::Float(v)) => v,
256                        Some(alopex_sql::storage::SqlValue::Double(v)) => v as f32,
257                        _ => return None,
258                    };
259                    let id = match row.get(pk_index) {
260                        Some(alopex_sql::storage::SqlValue::Integer(v)) => *v as u64,
261                        Some(alopex_sql::storage::SqlValue::BigInt(v)) => *v as u64,
262                        _ => return None,
263                    };
264                    Some(VectorSearchResult {
265                        id,
266                        distance: score,
267                        row,
268                    })
269                })
270                .collect()
271        }
272        other => {
273            state.metrics.record_query(start.elapsed(), false);
274            return Err(ServerError::BadRequest(format!(
275                "vector search returned non-query result: {other:?}"
276            )));
277        }
278    };
279
280    state.metrics.record_query(start.elapsed(), true);
281    Ok(VectorSearchResponse { results })
282}
283
284pub(crate) async fn upsert_impl(
285    state: Arc<ServerState>,
286    request: VectorUpsertRequest,
287) -> Result<VectorUpsertResponse> {
288    let start = Instant::now();
289    state.lifecycle_state.check_write_allowed()?;
290    let (table_meta, vector_col, _) =
291        match resolve_vector_table(&state, &request.table, request.column.as_deref()) {
292            Ok(values) => values,
293            Err(err) => {
294                state.metrics.record_query(start.elapsed(), false);
295                return Err(err);
296            }
297        };
298    let pk_index = match primary_key_index(&table_meta) {
299        Some(index) => index,
300        None => {
301            state.metrics.record_query(start.elapsed(), false);
302            return Err(ServerError::BadRequest(
303                "table must have a primary key for vector upsert".into(),
304            ));
305        }
306    };
307    let pk_name = match table_meta.columns.get(pk_index).map(|c| c.name.clone()) {
308        Some(name) => name,
309        None => {
310            state.metrics.record_query(start.elapsed(), false);
311            return Err(ServerError::BadRequest(
312                "primary key column not found".into(),
313            ));
314        }
315    };
316
317    let vector_literal = format_vector_literal(&request.vector);
318    let insert_sql = format!(
319        "INSERT INTO {} ({}, {}) VALUES ({}, {})",
320        quote_ident(&table_meta.name),
321        quote_ident(&pk_name),
322        quote_ident(&vector_col),
323        request.id,
324        vector_literal
325    );
326    let update_sql = format!(
327        "UPDATE {} SET {} = {} WHERE {} = {}",
328        quote_ident(&table_meta.name),
329        quote_ident(&vector_col),
330        vector_literal,
331        quote_ident(&pk_name),
332        request.id
333    );
334
335    let mut txn = match state.begin_sql_txn().await {
336        Ok(txn) => txn,
337        Err(err) => {
338            state.metrics.record_query(start.elapsed(), false);
339            return Err(err);
340        }
341    };
342    let exec_result = match tokio::time::timeout(
343        state.config.query_timeout,
344        txn.async_execute(&insert_sql),
345    )
346    .await
347    {
348        Ok(result) => result,
349        Err(_) => {
350            let _ = txn.async_rollback().await;
351            state.metrics.record_query(start.elapsed(), false);
352            return Err(ServerError::Timeout("query timeout".into()));
353        }
354    };
355
356    let exec_result = match exec_result {
357        Ok(result) => Ok(result),
358        Err(err) => {
359            if is_unique_violation(&err) {
360                match tokio::time::timeout(
361                    state.config.query_timeout,
362                    txn.async_execute(&update_sql),
363                )
364                .await
365                {
366                    Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
367                    Err(_) => Err(ServerError::Timeout("query timeout".into())),
368                }
369            } else {
370                Err(ServerError::Sql(err.into()))
371            }
372        }
373    };
374
375    let exec_result = match exec_result {
376        Ok(result) => result,
377        Err(err) => {
378            let _ = txn.async_rollback().await;
379            state.metrics.record_query(start.elapsed(), false);
380            return Err(err);
381        }
382    };
383
384    if let Err(err) = txn.async_commit().await {
385        state.metrics.record_query(start.elapsed(), false);
386        return Err(ServerError::Sql(err.into()));
387    }
388
389    let response = match exec_result {
390        alopex_sql::executor::ExecutionResult::Success
391        | alopex_sql::executor::ExecutionResult::RowsAffected(_) => {
392            Ok(VectorUpsertResponse { success: true })
393        }
394        _ => {
395            state.metrics.record_query(start.elapsed(), false);
396            Err(ServerError::BadRequest(
397                "vector upsert returned unexpected result".into(),
398            ))
399        }
400    }?;
401
402    state.metrics.record_query(start.elapsed(), true);
403    Ok(response)
404}
405
406pub(crate) async fn delete_impl(
407    state: Arc<ServerState>,
408    request: VectorDeleteRequest,
409) -> Result<VectorDeleteResponse> {
410    let start = Instant::now();
411    state.lifecycle_state.check_write_allowed()?;
412    let (table_meta, _, _) =
413        resolve_vector_table(&state, &request.table, request.column.as_deref())?;
414    let pk_index = primary_key_index(&table_meta).ok_or_else(|| {
415        ServerError::BadRequest("table must have a primary key for vector delete".into())
416    })?;
417    let pk_name = table_meta
418        .columns
419        .get(pk_index)
420        .map(|c| c.name.clone())
421        .ok_or_else(|| ServerError::BadRequest("primary key column not found".into()))?;
422
423    let delete_sql = format!(
424        "DELETE FROM {} WHERE {} = {}",
425        quote_ident(&table_meta.name),
426        quote_ident(&pk_name),
427        request.id
428    );
429
430    let mut txn = state.begin_sql_txn().await?;
431    let exec_result = match tokio::time::timeout(
432        state.config.query_timeout,
433        txn.async_execute(&delete_sql),
434    )
435    .await
436    {
437        Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
438        Err(_) => Err(ServerError::Timeout("query timeout".into())),
439    };
440    let exec_result = match exec_result {
441        Ok(result) => result,
442        Err(err) => {
443            let _ = txn.async_rollback().await;
444            state.metrics.record_query(start.elapsed(), false);
445            return Err(err);
446        }
447    };
448    if let Err(err) = txn.async_commit().await {
449        state.metrics.record_query(start.elapsed(), false);
450        return Err(ServerError::Sql(err.into()));
451    }
452
453    let success = match exec_result {
454        alopex_sql::executor::ExecutionResult::RowsAffected(rows) => rows > 0,
455        alopex_sql::executor::ExecutionResult::Success => true,
456        _ => {
457            return Err(ServerError::BadRequest(
458                "vector delete returned unexpected result".into(),
459            ))
460        }
461    };
462
463    state.metrics.record_query(start.elapsed(), true);
464    Ok(VectorDeleteResponse { success })
465}
466
467pub(crate) async fn index_create_impl(
468    state: Arc<ServerState>,
469    request: VectorIndexCreateRequest,
470) -> Result<VectorIndexResponse> {
471    let start = Instant::now();
472    state.lifecycle_state.check_write_allowed()?;
473    let sql = match build_create_index_sql(
474        &request.name,
475        &request.table,
476        &request.column,
477        request.method.as_deref(),
478        &request.options,
479        request.if_not_exists,
480    ) {
481        Ok(sql) => sql,
482        Err(err) => {
483            state.metrics.record_query(start.elapsed(), false);
484            return Err(err);
485        }
486    };
487    let response = execute_index_sql(state, start, &sql).await?;
488    Ok(response)
489}
490
491pub(crate) async fn index_update_impl(
492    state: Arc<ServerState>,
493    request: VectorIndexUpdateRequest,
494) -> Result<VectorIndexResponse> {
495    let start = Instant::now();
496    state.lifecycle_state.check_write_allowed()?;
497    let create_sql = match build_create_index_sql(
498        &request.name,
499        &request.table,
500        &request.column,
501        request.method.as_deref(),
502        &request.options,
503        false,
504    ) {
505        Ok(sql) => sql,
506        Err(err) => {
507            state.metrics.record_query(start.elapsed(), false);
508            return Err(err);
509        }
510    };
511    let drop_sql = build_drop_index_sql(&request.name, true);
512    let sql = format!("{drop_sql}; {create_sql}");
513    let response = execute_index_sql(state, start, &sql).await?;
514    Ok(response)
515}
516
517pub(crate) async fn index_delete_impl(
518    state: Arc<ServerState>,
519    request: VectorIndexDeleteRequest,
520) -> Result<VectorIndexResponse> {
521    let start = Instant::now();
522    state.lifecycle_state.check_write_allowed()?;
523    let sql = build_drop_index_sql(&request.name, request.if_exists);
524    let response = execute_index_sql(state, start, &sql).await?;
525    Ok(response)
526}
527
528pub(crate) async fn index_compact_impl(
529    state: Arc<ServerState>,
530    request: VectorIndexCompactRequest,
531) -> Result<VectorIndexResponse> {
532    let start = Instant::now();
533    state.lifecycle_state.check_write_allowed()?;
534    let index = {
535        let guard = match state.catalog.read() {
536            Ok(guard) => guard,
537            Err(_) => {
538                state.metrics.record_query(start.elapsed(), false);
539                return Err(ServerError::Internal("catalog lock poisoned".into()));
540            }
541        };
542        match guard.get_index(&request.name).cloned() {
543            Some(index) => index,
544            None => {
545                state.metrics.record_query(start.elapsed(), false);
546                return Err(ServerError::NotFound("index not found".into()));
547            }
548        }
549    };
550    if !matches!(index.method, Some(alopex_sql::ast::ddl::IndexMethod::Hnsw)) {
551        state.metrics.record_query(start.elapsed(), false);
552        return Err(ServerError::BadRequest(
553            "index compact is only supported for HNSW".into(),
554        ));
555    }
556
557    let store = state.store.clone();
558    let name = request.name.clone();
559    let compacted = tokio::task::spawn_blocking(move || {
560        let mut txn = store.begin(TxnMode::ReadWrite)?;
561        let mut hnsw = HnswIndex::load(&name, &mut txn)?;
562        let result = hnsw.compact()?;
563        hnsw.save(&mut txn)?;
564        txn.commit_self()?;
565        Ok::<_, ServerError>(result)
566    })
567    .await
568    .map_err(|err| ServerError::Internal(err.to_string()));
569    let compacted = match compacted {
570        Ok(result) => result,
571        Err(err) => {
572            state.metrics.record_query(start.elapsed(), false);
573            return Err(err);
574        }
575    };
576
577    state.metrics.record_query(start.elapsed(), true);
578    let _ = compacted;
579    Ok(VectorIndexResponse { success: true })
580}
581
582async fn execute_index_sql(
583    state: Arc<ServerState>,
584    start: Instant,
585    sql: &str,
586) -> Result<VectorIndexResponse> {
587    let mut txn = state.begin_sql_txn().await?;
588    let exec_result =
589        match tokio::time::timeout(state.config.query_timeout, txn.async_execute(sql)).await {
590            Ok(result) => result.map_err(|err| ServerError::Sql(err.into())),
591            Err(_) => Err(ServerError::Timeout("query timeout".into())),
592        };
593    let exec_result = match exec_result {
594        Ok(result) => result,
595        Err(err) => {
596            let _ = txn.async_rollback().await;
597            state.metrics.record_query(start.elapsed(), false);
598            return Err(err);
599        }
600    };
601    if let Err(err) = txn.async_commit().await {
602        state.metrics.record_query(start.elapsed(), false);
603        return Err(ServerError::Sql(err.into()));
604    }
605
606    match exec_result {
607        alopex_sql::executor::ExecutionResult::Success
608        | alopex_sql::executor::ExecutionResult::RowsAffected(_) => {
609            state.metrics.record_query(start.elapsed(), true);
610            Ok(VectorIndexResponse { success: true })
611        }
612        _ => {
613            state.metrics.record_query(start.elapsed(), false);
614            Err(ServerError::BadRequest(
615                "vector index operation returned unexpected result".into(),
616            ))
617        }
618    }
619}
620
621fn build_create_index_sql(
622    name: &str,
623    table: &str,
624    column: &str,
625    method: Option<&str>,
626    options: &HashMap<String, String>,
627    if_not_exists: bool,
628) -> Result<String> {
629    if name.trim().is_empty() || table.trim().is_empty() || column.trim().is_empty() {
630        return Err(ServerError::BadRequest(
631            "index name, table, and column must not be empty".into(),
632        ));
633    }
634
635    let method_sql = match method.map(|m| m.to_ascii_lowercase()) {
636        None => String::new(),
637        Some(m) if m == "hnsw" => " USING HNSW".to_string(),
638        Some(m) if m == "btree" => " USING BTREE".to_string(),
639        Some(other) => {
640            return Err(ServerError::BadRequest(format!(
641                "unsupported index method: {other}"
642            )))
643        }
644    };
645    let options_sql = build_index_options_sql(options);
646    let if_not_exists = if if_not_exists { " IF NOT EXISTS" } else { "" };
647    Ok(format!(
648        "CREATE INDEX{if_not_exists} {} ON {} ({}){method_sql}{options_sql}",
649        quote_ident(name),
650        quote_ident(table),
651        quote_ident(column)
652    ))
653}
654
655fn build_drop_index_sql(name: &str, if_exists: bool) -> String {
656    let if_exists = if if_exists { " IF EXISTS" } else { "" };
657    format!("DROP INDEX{if_exists} {}", quote_ident(name))
658}
659
660fn build_index_options_sql(options: &HashMap<String, String>) -> String {
661    if options.is_empty() {
662        return String::new();
663    }
664    let mut items = Vec::with_capacity(options.len());
665    for (key, value) in options {
666        let quoted = value.replace('\'', "''");
667        items.push(format!("{}='{}'", quote_ident(key), quoted));
668    }
669    format!(" WITH ({})", items.join(", "))
670}
671
672fn resolve_vector_table(
673    state: &ServerState,
674    table: &str,
675    column: Option<&str>,
676) -> Result<(
677    alopex_sql::catalog::TableMetadata,
678    String,
679    alopex_sql::ast::ddl::VectorMetric,
680)> {
681    let catalog = state
682        .catalog
683        .read()
684        .map_err(|_| ServerError::Internal("catalog lock poisoned".into()))?;
685    let table_meta = catalog
686        .get_table(table)
687        .cloned()
688        .ok_or_else(|| ServerError::NotFound("table not found".into()))?;
689
690    let (vector_col, metric) = match column {
691        Some(name) => {
692            let col = table_meta
693                .columns
694                .iter()
695                .find(|c| c.name == name)
696                .ok_or_else(|| ServerError::NotFound("vector column not found".into()))?;
697            match &col.data_type {
698                alopex_sql::planner::ResolvedType::Vector { metric, .. } => {
699                    (col.name.clone(), *metric)
700                }
701                _ => {
702                    return Err(ServerError::BadRequest(
703                        "specified column is not a vector".into(),
704                    ))
705                }
706            }
707        }
708        None => {
709            let col = table_meta
710                .columns
711                .iter()
712                .find(|c| {
713                    matches!(
714                        c.data_type,
715                        alopex_sql::planner::ResolvedType::Vector { .. }
716                    )
717                })
718                .ok_or_else(|| ServerError::BadRequest("vector column not found".into()))?;
719            match &col.data_type {
720                alopex_sql::planner::ResolvedType::Vector { metric, .. } => {
721                    (col.name.clone(), *metric)
722                }
723                _ => return Err(ServerError::BadRequest("vector column not found".into())),
724            }
725        }
726    };
727
728    Ok((table_meta, vector_col, metric))
729}
730
731fn primary_key_index(table: &alopex_sql::catalog::TableMetadata) -> Option<usize> {
732    if let Some(keys) = table.primary_key.as_ref() {
733        if let Some(primary) = keys.first() {
734            return table.columns.iter().position(|col| col.name == *primary);
735        }
736    }
737    table.columns.iter().position(|col| col.primary_key)
738}
739
740fn format_vector_literal(vector: &[f32]) -> String {
741    let body = vector
742        .iter()
743        .map(|v| format!("{v}"))
744        .collect::<Vec<_>>()
745        .join(", ");
746    format!("[{body}]")
747}
748
749fn metric_to_string(metric: alopex_sql::ast::ddl::VectorMetric) -> &'static str {
750    match metric {
751        alopex_sql::ast::ddl::VectorMetric::Cosine => "cosine",
752        alopex_sql::ast::ddl::VectorMetric::L2 => "l2",
753        alopex_sql::ast::ddl::VectorMetric::Inner => "inner",
754    }
755}
756
757fn quote_ident(ident: &str) -> String {
758    ident.to_string()
759}
760
761fn is_unique_violation(err: &alopex_sql::executor::ExecutorError) -> bool {
762    use alopex_sql::executor::ConstraintViolation;
763    use alopex_sql::executor::ExecutorError;
764
765    matches!(
766        err,
767        ExecutorError::ConstraintViolation(ConstraintViolation::PrimaryKey { .. })
768            | ExecutorError::ConstraintViolation(ConstraintViolation::Unique { .. })
769    )
770}
771
772fn default_k() -> usize {
773    10
774}