use std::sync::Arc;
use crate::error::{Result, VectorizerError};
use crate::http_transport::HttpTransport;
use crate::models::*;
use crate::transport::{Protocol, Transport};
#[cfg(feature = "umicp")]
use crate::umicp_transport::UmicpTransport;
pub mod collections;
pub mod core;
pub mod discovery;
pub mod files;
pub mod graph;
pub mod qdrant;
pub mod search;
pub mod vectors;
#[derive(Clone)]
pub struct ClientConfig {
pub base_url: Option<String>,
pub connection_string: Option<String>,
pub protocol: Option<Protocol>,
pub api_key: Option<String>,
pub timeout_secs: Option<u64>,
#[cfg(feature = "umicp")]
pub umicp: Option<UmicpConfig>,
pub hosts: Option<HostConfig>,
pub read_preference: Option<ReadPreference>,
}
#[cfg(feature = "umicp")]
#[derive(Clone)]
pub struct UmicpConfig {
pub host: String,
pub port: u16,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
base_url: Some("http://localhost:15002".to_string()),
connection_string: None,
protocol: None,
api_key: None,
timeout_secs: Some(30),
#[cfg(feature = "umicp")]
umicp: None,
hosts: None,
read_preference: None,
}
}
}
pub struct VectorizerClient {
pub(crate) transport: Arc<dyn Transport>,
protocol: Protocol,
base_url: String,
#[allow(dead_code)]
master_transport: Option<Arc<dyn Transport>>,
#[allow(dead_code)]
replica_transports: Vec<Arc<dyn Transport>>,
#[allow(dead_code)]
replica_index: std::sync::atomic::AtomicUsize,
#[allow(dead_code)]
read_preference: ReadPreference,
#[allow(dead_code)]
is_replica_mode: bool,
pub(crate) config: ClientConfig,
}
impl VectorizerClient {
pub fn base_url(&self) -> &str {
&self.base_url
}
pub fn new(config: ClientConfig) -> Result<Self> {
let timeout_secs = config.timeout_secs.unwrap_or(30);
let (transport, protocol, base_url): (Arc<dyn Transport>, Protocol, String) =
if let Some(ref conn_str) = config.connection_string {
#[allow(unused_variables)]
let (proto, host, port) = crate::transport::parse_connection_string(conn_str)?;
match proto {
Protocol::Http => {
let transport =
HttpTransport::new(&host, config.api_key.as_deref(), timeout_secs)?;
(Arc::new(transport), Protocol::Http, host.clone())
}
#[cfg(feature = "umicp")]
Protocol::Umicp => {
let umicp_port = port.unwrap_or(15003);
let transport = UmicpTransport::new(
&host,
umicp_port,
config.api_key.as_deref(),
timeout_secs,
)?;
let base_url = format!("umicp://{host}:{umicp_port}");
(Arc::new(transport), Protocol::Umicp, base_url)
}
}
} else {
let proto = config.protocol.unwrap_or(Protocol::Http);
match proto {
Protocol::Http => {
let base_url = config
.base_url
.clone()
.unwrap_or_else(|| "http://localhost:15002".to_string());
let transport =
HttpTransport::new(&base_url, config.api_key.as_deref(), timeout_secs)?;
(Arc::new(transport), Protocol::Http, base_url)
}
#[cfg(feature = "umicp")]
Protocol::Umicp => {
#[cfg(feature = "umicp")]
{
let umicp_config = config.umicp.clone().ok_or_else(|| {
VectorizerError::configuration(
"UMICP configuration is required when using UMICP protocol",
)
})?;
let transport = UmicpTransport::new(
&umicp_config.host,
umicp_config.port,
config.api_key.as_deref(),
timeout_secs,
)?;
let base_url =
format!("umicp://{}:{}", umicp_config.host, umicp_config.port);
(Arc::new(transport), Protocol::Umicp, base_url)
}
#[cfg(not(feature = "umicp"))]
{
return Err(VectorizerError::configuration(
"UMICP feature is not enabled. Enable it with --features umicp",
));
}
}
}
};
let (master_transport, replica_transports, is_replica_mode) =
if let Some(ref hosts) = config.hosts {
let master =
HttpTransport::new(&hosts.master, config.api_key.as_deref(), timeout_secs)?;
let replicas: Result<Vec<Arc<dyn Transport>>> = hosts
.replicas
.iter()
.map(|url| {
let t = HttpTransport::new(url, config.api_key.as_deref(), timeout_secs)?;
Ok(Arc::new(t) as Arc<dyn Transport>)
})
.collect();
(
Some(Arc::new(master) as Arc<dyn Transport>),
replicas?,
true,
)
} else {
(None, vec![], false)
};
let read_preference = config.read_preference.unwrap_or(ReadPreference::Replica);
Ok(Self {
transport,
protocol,
base_url,
master_transport,
replica_transports,
replica_index: std::sync::atomic::AtomicUsize::new(0),
read_preference,
is_replica_mode,
config,
})
}
pub fn new_default() -> Result<Self> {
Self::new(ClientConfig::default())
}
pub fn new_with_url(base_url: &str) -> Result<Self> {
Self::new(ClientConfig {
base_url: Some(base_url.to_string()),
..Default::default()
})
}
pub fn new_with_api_key(base_url: &str, api_key: &str) -> Result<Self> {
Self::new(ClientConfig {
base_url: Some(base_url.to_string()),
api_key: Some(api_key.to_string()),
..Default::default()
})
}
pub fn from_connection_string(connection_string: &str, api_key: Option<&str>) -> Result<Self> {
Self::new(ClientConfig {
connection_string: Some(connection_string.to_string()),
api_key: api_key.map(|s| s.to_string()),
..Default::default()
})
}
pub fn protocol(&self) -> Protocol {
self.protocol
}
#[allow(dead_code)]
pub(crate) fn get_write_transport(&self) -> &Arc<dyn Transport> {
if self.is_replica_mode {
self.master_transport.as_ref().unwrap_or(&self.transport)
} else {
&self.transport
}
}
#[allow(dead_code)]
pub(crate) fn get_read_transport(&self, options: Option<&ReadOptions>) -> &Arc<dyn Transport> {
if !self.is_replica_mode {
return &self.transport;
}
let preference = options
.and_then(|o| o.read_preference)
.unwrap_or(self.read_preference);
match preference {
ReadPreference::Master => self.master_transport.as_ref().unwrap_or(&self.transport),
ReadPreference::Replica | ReadPreference::Nearest => {
if self.replica_transports.is_empty() {
return self.master_transport.as_ref().unwrap_or(&self.transport);
}
let idx = self
.replica_index
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% self.replica_transports.len();
&self.replica_transports[idx]
}
}
}
pub async fn with_master<F, Fut, T>(&self, callback: F) -> Result<T>
where
F: FnOnce(VectorizerClient) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut master_config = self.config.clone();
master_config.read_preference = Some(ReadPreference::Master);
let master_client = VectorizerClient::new(master_config)?;
callback(master_client).await
}
pub fn with_transport(transport: Arc<dyn Transport>, base_url: impl Into<String>) -> Self {
let protocol = transport.protocol();
Self {
transport,
protocol,
base_url: base_url.into(),
master_transport: None,
replica_transports: Vec::new(),
replica_index: std::sync::atomic::AtomicUsize::new(0),
read_preference: ReadPreference::Master,
is_replica_mode: false,
config: ClientConfig::default(),
}
}
pub(crate) async fn make_request(
&self,
method: &str,
endpoint: &str,
payload: Option<serde_json::Value>,
) -> Result<String> {
match method {
"GET" => self.transport.get(endpoint).await,
"POST" => self.transport.post(endpoint, payload.as_ref()).await,
"PUT" => self.transport.put(endpoint, payload.as_ref()).await,
"DELETE" => self.transport.delete(endpoint).await,
_ => Err(VectorizerError::configuration(format!(
"Unsupported method: {method}"
))),
}
}
}