pub(crate) mod builders;
mod collection;
pub mod config;
mod conversions;
pub mod error;
mod index;
mod payload;
mod points;
mod query;
mod search;
mod sharding_keys;
mod snapshot;
mod version_check;
use std::future::Future;
use std::sync::Arc;
use std::thread;
use tonic::codegen::InterceptedService;
use tonic::transport::{Channel, Uri};
use tonic::Status;
use crate::auth::TokenInterceptor;
use crate::channel_pool::ChannelPool;
use crate::qdrant::{qdrant_client, HealthCheckReply, HealthCheckRequest};
use crate::qdrant_client::config::QdrantConfig;
use crate::qdrant_client::version_check::is_compatible;
use crate::QdrantError;
pub type QdrantResult<T> = Result<T, QdrantError>;
pub type QdrantBuilder = QdrantConfig;
#[derive(Clone)]
pub struct Qdrant {
pub config: QdrantConfig,
channel: Arc<ChannelPool>,
}
impl Qdrant {
pub fn new(config: QdrantConfig) -> QdrantResult<Self> {
if config.check_compatibility {
let channel = ChannelPool::new(
config.uri.parse::<Uri>()?,
config.timeout,
config.connect_timeout,
config.keep_alive_while_idle,
1, );
let client = Self {
channel: Arc::new(channel),
config: config.clone(),
};
let server_version = thread::scope(|s| {
s.spawn(|| {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.map_err(QdrantError::Io)?
.block_on(client.health_check())
})
.join()
.expect("Failed to join health check thread")
})
.ok()
.map(|info| info.version);
let client_version = env!("CARGO_PKG_VERSION").to_string();
if let Some(server_version) = server_version {
let is_compatible = is_compatible(Some(&client_version), Some(&server_version));
if !is_compatible {
println!("Client version {client_version} is not compatible with server version {server_version}. \
Major versions should match and minor version difference must not exceed 1. \
Set check_compatibility=false to skip version check.");
}
} else {
println!(
"Failed to obtain server version. \
Unable to check client-server compatibility. \
Set check_compatibility=false to skip version check."
);
}
}
let channel = ChannelPool::new(
config.uri.parse::<Uri>()?,
config.timeout,
config.connect_timeout,
config.keep_alive_while_idle,
config.pool_size,
);
let client = Self {
channel: Arc::new(channel),
config,
};
Ok(client)
}
pub fn from_url(url: &str) -> QdrantBuilder {
QdrantBuilder::from_url(url)
}
fn with_api_key(&self, channel: Channel) -> InterceptedService<Channel, TokenInterceptor> {
let interceptor = TokenInterceptor::new(self.config.api_key.clone());
InterceptedService::new(channel, interceptor)
}
async fn with_root_qdrant_client<T, O: Future<Output = Result<T, Status>>>(
&self,
f: impl Fn(qdrant_client::QdrantClient<InterceptedService<Channel, TokenInterceptor>>) -> O,
) -> QdrantResult<T> {
let result = self
.channel
.with_channel(
|channel| {
let service = self.with_api_key(channel);
let mut client = qdrant_client::QdrantClient::new(service)
.max_decoding_message_size(usize::MAX);
if let Some(compression) = self.config.compression {
client = client
.send_compressed(compression.into())
.accept_compressed(compression.into());
}
f(client)
},
true,
)
.await?;
Ok(result)
}
pub async fn health_check(&self) -> QdrantResult<HealthCheckReply> {
self.with_root_qdrant_client(|mut qdrant_api| async move {
let result = qdrant_api.health_check(HealthCheckRequest {}).await?;
Ok(result.into_inner())
})
.await
}
}