use std::future::Future;
use anyhow::{anyhow, Result};
use crate::backends::AnyBackend;
use crate::chunker::Chunk;
use crate::config::TargetConfig;
pub mod base;
pub mod clickhouse;
pub mod mariadb;
#[cfg(feature = "memory")]
pub mod memory_pg;
pub mod pg;
pub mod sqlite;
pub use base::Sink;
pub use clickhouse::ClickhouseSink;
pub use mariadb::MariadbSink;
#[cfg(feature = "memory")]
pub use memory_pg::MemorySink;
pub use pg::PgSink;
pub use sqlite::SqliteSink;
pub enum AnySink {
Pg(PgSink),
Mariadb(MariadbSink),
Sqlite(SqliteSink),
Clickhouse(ClickhouseSink),
#[cfg(feature = "memory")]
Memory(MemorySink),
}
impl Sink for AnySink {
fn create_table(&self) -> impl Future<Output = Result<()>> + Send {
async move {
match self {
AnySink::Pg(s) => s.create_table().await,
AnySink::Mariadb(s) => s.create_table().await,
AnySink::Sqlite(s) => s.create_table().await,
AnySink::Clickhouse(s) => s.create_table().await,
#[cfg(feature = "memory")]
AnySink::Memory(s) => s.create_table().await,
}
}
}
fn write_document(
&self,
doc_id: &str,
chunks: &[Chunk],
embeddings: &[Vec<f32>],
tags_per_chunk: &[Vec<String>],
) -> impl Future<Output = Result<()>> + Send {
async move {
match self {
AnySink::Pg(s) => {
s.write_document(doc_id, chunks, embeddings, tags_per_chunk)
.await
}
AnySink::Mariadb(s) => {
s.write_document(doc_id, chunks, embeddings, tags_per_chunk)
.await
}
AnySink::Sqlite(s) => {
s.write_document(doc_id, chunks, embeddings, tags_per_chunk)
.await
}
AnySink::Clickhouse(s) => {
s.write_document(doc_id, chunks, embeddings, tags_per_chunk)
.await
}
#[cfg(feature = "memory")]
AnySink::Memory(s) => {
s.write_document(doc_id, chunks, embeddings, tags_per_chunk)
.await
}
}
}
}
fn delete_document(&self, doc_id: &str) -> impl Future<Output = Result<i64>> + Send {
async move {
match self {
AnySink::Pg(s) => s.delete_document(doc_id).await,
AnySink::Mariadb(s) => s.delete_document(doc_id).await,
AnySink::Sqlite(s) => s.delete_document(doc_id).await,
AnySink::Clickhouse(s) => s.delete_document(doc_id).await,
#[cfg(feature = "memory")]
AnySink::Memory(_) => Err(anyhow!(
"MemorySink does not implement delete_document; supersede is a write-side concern"
)),
}
}
}
fn count_docs(&self) -> impl Future<Output = Result<i64>> + Send {
async move {
match self {
AnySink::Pg(s) => s.count_docs().await,
AnySink::Mariadb(s) => s.count_docs().await,
AnySink::Sqlite(s) => s.count_docs().await,
AnySink::Clickhouse(s) => s.count_docs().await,
#[cfg(feature = "memory")]
AnySink::Memory(s) => s.count_docs().await,
}
}
}
fn query_top_k(
&self,
query_vec: &[f32],
k: usize,
) -> impl Future<Output = Result<Vec<(String, i32, f64)>>> + Send {
async move {
match self {
AnySink::Pg(s) => s.query_top_k(query_vec, k).await,
AnySink::Mariadb(s) => s.query_top_k(query_vec, k).await,
AnySink::Sqlite(s) => s.query_top_k(query_vec, k).await,
AnySink::Clickhouse(s) => s.query_top_k(query_vec, k).await,
#[cfg(feature = "memory")]
AnySink::Memory(s) => s.query_top_k(query_vec, k).await,
}
}
}
}
pub fn load_sink(cfg: &TargetConfig, backend: AnyBackend, dim: usize) -> Result<AnySink> {
match (cfg, backend) {
(TargetConfig::Postgres(t), AnyBackend::Postgres(b)) => {
#[cfg(feature = "memory")]
if t.memory.is_some() {
return Ok(AnySink::Memory(MemorySink::new(t.clone(), b, dim)));
}
Ok(AnySink::Pg(PgSink::new(t.clone(), b, dim)))
}
(TargetConfig::Mariadb(t), AnyBackend::Mariadb(b)) => {
Ok(AnySink::Mariadb(MariadbSink::new(t.clone(), b, dim)))
}
(TargetConfig::Sqlite(t), AnyBackend::Sqlite(b)) => {
Ok(AnySink::Sqlite(SqliteSink::new(t.clone(), b, dim)))
}
(TargetConfig::Clickhouse(t), AnyBackend::Clickhouse(b)) => {
Ok(AnySink::Clickhouse(ClickhouseSink::new(t.clone(), b, dim)))
}
#[allow(unreachable_patterns)]
_ => Err(anyhow!(
"backend / target type mismatch — programming error in load_sink dispatch"
)),
}
}