use crate::collections::error::SchemaError;
use crate::collections::schema::{
Class, Classes, Property, Shard, ShardStatus, Shards, Tenant, Tenants,
};
use reqwest::Url;
use std::error::Error;
use std::sync::Arc;
#[derive(Debug)]
pub struct Schema {
endpoint: Url,
client: Arc<reqwest::Client>,
}
impl Schema {
pub(super) fn new(url: &Url, client: Arc<reqwest::Client>) -> Result<Self, Box<dyn Error>> {
let endpoint = url.join("/v1/schema/")?;
Ok(Schema { endpoint, client })
}
pub async fn get_class(&self, class_name: &str) -> Result<Class, Box<dyn Error>> {
let endpoint = self.endpoint.join(class_name)?;
let res = self.client.get(endpoint).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let res: Class = res.json().await?;
Ok(res)
},
_ => Err(self.get_err_msg("get class", res).await),
}
}
pub async fn get(&self) -> Result<Classes, Box<dyn Error>> {
let res = self.client.get(self.endpoint.clone()).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let res: Classes = res.json().await?;
Ok(res)
}
_ => Err(self.get_err_msg("get schema", res).await),
}
}
pub async fn create_class(&self, class: &Class) -> Result<Class, Box<dyn Error>> {
let payload = serde_json::to_value(&class).unwrap();
let res = self
.client
.post(self.endpoint.clone())
.json(&payload)
.send()
.await?;
match res.status() {
reqwest::StatusCode::OK => {
let res: Class = res.json().await?;
Ok(res)
}
_ => Err(self.get_err_msg("create class", res).await),
}
}
pub async fn delete(&self, class_name: &str) -> Result<bool, Box<dyn Error>> {
let endpoint = self.endpoint.join(class_name)?;
let res = self.client.delete(endpoint).send().await?;
match res.status() {
reqwest::StatusCode::OK => Ok(true),
_ => Err(self.get_err_msg("delete class", res).await),
}
}
pub async fn update(&self, class: &Class) -> Result<Class, Box<dyn Error>> {
let endpoint = self.endpoint.join(&class.class)?;
let payload = serde_json::to_value(&class)?;
let res = self.client.put(endpoint).json(&payload).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let res: Class = res.json().await?;
Ok(res)
}
_ => Err(self.get_err_msg("update class", res).await),
}
}
pub async fn add_property(
&self,
class_name: &str,
property: &Property,
) -> Result<Property, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/properties");
let endpoint = self.endpoint.join(&endpoint)?;
let payload = serde_json::to_value(&property)?;
let res = self.client.post(endpoint).json(&payload).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let res: Property = res.json().await?;
Ok(res)
}
_ => Err(self.get_err_msg("add property", res).await),
}
}
pub async fn get_shards(&self, class_name: &str) -> Result<Shards, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/shards");
let endpoint = self.endpoint.join(&endpoint)?;
let res = self.client.get(endpoint).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let shards = res.json::<Vec<Shard>>().await?;
let shards = Shards { shards };
Ok(shards)
}
_ => Err(self.get_err_msg("get shards", res).await),
}
}
pub async fn update_class_shard(
&self,
class_name: &str,
shard_name: &str,
status: ShardStatus,
) -> Result<Shard, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/shards/");
endpoint.push_str(shard_name);
let endpoint = self.endpoint.join(&endpoint)?;
let payload = serde_json::json!({ "status": status });
let res = self.client.put(endpoint).json(&payload).send().await?;
match res.status() {
reqwest::StatusCode::OK => Ok(Shard {
name: shard_name.into(),
status,
}),
_ => Err(self.get_err_msg("update class shard", res).await),
}
}
pub async fn list_tenants(&self, class_name: &str) -> Result<Tenants, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/tenants");
let endpoint = self.endpoint.join(&endpoint)?;
let res = self.client.get(endpoint).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let tenants = res.json::<Vec<Tenant>>().await?;
let tenants = Tenants { tenants };
Ok(tenants)
}
_ => Err(self.get_err_msg("list tenants", res).await),
}
}
pub async fn add_tenants(
&self,
class_name: &str,
tenants: &Tenants,
) -> Result<Tenants, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/tenants");
let endpoint = self.endpoint.join(&endpoint)?;
let payload = serde_json::to_value(&tenants.tenants)?;
let res = self.client.post(endpoint).json(&payload).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let tenants = res.json::<Vec<Tenant>>().await?;
let tenants = Tenants { tenants };
Ok(tenants)
}
_ => Err(self.get_err_msg("add tenants", res).await),
}
}
pub async fn remove_tenants(
&self,
class_name: &str,
tenants: &Vec<&str>,
) -> Result<bool, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/tenants");
let endpoint = self.endpoint.join(&endpoint)?;
let payload = serde_json::to_value(&tenants)?;
let res = self.client.delete(endpoint).json(&payload).send().await?;
match res.status() {
reqwest::StatusCode::OK => Ok(true),
_ => Err(self.get_err_msg("remove tenants", res).await),
}
}
pub async fn update_tenants(
&self,
class_name: &str,
tenants: &Tenants,
) -> Result<Tenants, Box<dyn Error>> {
let mut endpoint = class_name.to_string();
endpoint.push_str("/tenants");
let endpoint = self.endpoint.join(&endpoint)?;
let payload = serde_json::to_value(&tenants.tenants)?;
let res = self.client.put(endpoint).json(&payload).send().await?;
match res.status() {
reqwest::StatusCode::OK => {
let tenants = res.json::<Vec<Tenant>>().await?;
let tenants = Tenants { tenants };
Ok(tenants)
}
_ => Err(self.get_err_msg("update tenants", res).await),
}
}
async fn get_err_msg(&self, endpoint: &str, res: reqwest::Response) -> Box<SchemaError> {
let status_code = res.status();
let msg: Result<serde_json::Value, reqwest::Error> = res.json().await;
let r_str: String;
if let Ok(json) = msg {
r_str = format!(
"Status code `{}` received when calling {} endpoint. Response: {}",
status_code,
endpoint,
json,
);
} else {
r_str = format!(
"Status code `{}` received when calling {} endpoint.",
status_code,
endpoint
);
}
Box::new(SchemaError(r_str))
}
}
#[cfg(test)]
mod tests {
use crate::collections::schema::{
ActivityStatus, Class, ClassBuilder, Classes, Property, Shard, ShardStatus, Shards, Tenant,
Tenants,
};
use crate::WeaviateClient;
fn test_class(class_name: &str) -> Class {
ClassBuilder::new(class_name)
.with_description("Test")
.build()
}
fn test_classes() -> Classes {
let class_a = test_class("Test1");
let class_b = test_class("Test1");
Classes::new(vec![class_a, class_b])
}
fn test_shard() -> Shard {
Shard::new("abcd", ShardStatus::READY)
}
fn test_property(property_name: &str) -> Property {
Property::builder(property_name, vec!["boolean"])
.with_description("test property")
.build()
}
fn test_tenants() -> Tenants {
Tenants::new(vec![
Tenant::builder("TENANT_A").build(),
Tenant::builder("TENANT_B")
.with_activity_status(ActivityStatus::COLD)
.build(),
])
}
fn test_shards() -> Shards {
Shards::new(vec![Shard::new("1D3PBjtz9W7r", ShardStatus::READY)])
}
fn get_test_harness() -> (mockito::ServerGuard, WeaviateClient) {
let mock_server = mockito::Server::new();
let mut host = "http://".to_string();
host.push_str(&mock_server.host_with_port());
let client = WeaviateClient::builder(&host).build().unwrap();
(mock_server, client)
}
fn mock_post(
server: &mut mockito::ServerGuard,
endpoint: &str,
status_code: usize,
body: &str,
) -> mockito::Mock {
server
.mock("POST", endpoint)
.with_status(status_code)
.with_header("content-type", "application/json")
.with_body(body)
.create()
}
fn mock_put(
server: &mut mockito::ServerGuard,
endpoint: &str,
status_code: usize,
body: &str,
) -> mockito::Mock {
server
.mock("PUT", endpoint)
.with_status(status_code)
.with_header("content-type", "application/json")
.with_body(body)
.create()
}
fn mock_get(
server: &mut mockito::ServerGuard,
endpoint: &str,
status_code: usize,
body: &str,
) -> mockito::Mock {
server
.mock("GET", endpoint)
.with_status(status_code)
.with_header("content-type", "application/json")
.with_body(body)
.create()
}
fn mock_delete(
server: &mut mockito::ServerGuard,
endpoint: &str,
status_code: usize,
) -> mockito::Mock {
server
.mock("DELETE", endpoint)
.with_status(status_code)
.create()
}
#[tokio::test]
async fn test_create_class_ok() {
let class = test_class("UnitClass");
let class_str = serde_json::to_string(&class).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_post(&mut mock_server, "/v1/schema/", 200, &class_str);
let res = client.schema.create_class(&class).await;
mock.assert();
assert!(res.is_ok());
assert_eq!(class.class, res.unwrap().class);
}
#[tokio::test]
async fn test_create_class_err() {
let class = test_class("UnitClass");
let (mut mock_server, client) = get_test_harness();
let mock = mock_post(&mut mock_server, "/v1/schema/", 401, "");
let res = client.schema.create_class(&class).await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_get_all_classes_ok() {
let classes = test_classes();
let class_str = serde_json::to_string(&classes).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/", 200, &class_str);
let res = client.schema.get().await;
mock.assert();
assert!(res.is_ok());
assert_eq!(classes.classes[0].class, res.unwrap().classes[0].class);
}
#[tokio::test]
async fn test_get_all_classes_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/", 401, "");
let class = client.schema.get().await;
mock.assert();
assert!(class.is_err());
}
#[tokio::test]
async fn test_get_single_class_ok() {
let class = test_class("Test");
let class_str = serde_json::to_string(&class).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/Test", 200, &class_str);
let res = client.schema.get_class("Test").await;
mock.assert();
assert!(res.is_ok());
assert_eq!(class.class, res.unwrap().class);
}
#[tokio::test]
async fn test_get_single_class_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/Test", 401, "");
let class = client.schema.get_class("Test").await;
mock.assert();
assert!(class.is_err());
}
#[tokio::test]
async fn test_get_delete_class_ok() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_delete(&mut mock_server, "/v1/schema/Test", 200);
let res = client.schema.delete("Test").await;
mock.assert();
assert!(res.is_ok());
assert!(res.unwrap());
}
#[tokio::test]
async fn test_get_delete_class_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_delete(&mut mock_server, "/v1/schema/Test", 401);
let class = client.schema.delete("Test").await;
mock.assert();
assert!(class.is_err());
}
#[tokio::test]
async fn test_update_class_ok() {
let class = test_class("Test");
let class_str = serde_json::to_string(&class).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_put(&mut mock_server, "/v1/schema/Test", 200, &class_str);
let res = client.schema.update(&class).await;
mock.assert();
assert!(res.is_ok());
assert_eq!(class.class, res.unwrap().class);
}
#[tokio::test]
async fn test_update_class_err() {
let class = test_class("Test");
let (mut mock_server, client) = get_test_harness();
let mock = mock_put(&mut mock_server, "/v1/schema/Test", 401, "");
let res = client.schema.update(&class).await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_add_property_ok() {
let property = test_property("Test");
let property_str = serde_json::to_string(&property).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_post(
&mut mock_server,
"/v1/schema/TestClass/properties",
200,
&property_str,
);
let res = client.schema.add_property("TestClass", &property).await;
mock.assert();
assert!(res.is_ok());
assert_eq!(property.name, res.unwrap().name);
}
#[tokio::test]
async fn test_add_property_err() {
let property = test_property("Test");
let (mut mock_server, client) = get_test_harness();
let mock = mock_post(&mut mock_server, "/v1/schema/TestClass/properties", 401, "");
let res = client.schema.add_property("TestClass", &property).await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_get_shards_ok() {
let shards = test_shards();
let shards_str = serde_json::to_string(&shards.shards).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/Test/shards", 200, &shards_str);
let res = client.schema.get_shards("Test").await;
mock.assert();
assert!(res.is_ok());
assert_eq!(shards.shards[0].name, res.unwrap().shards[0].name);
}
#[tokio::test]
async fn test_get_shards_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/Test/shards", 401, "");
let res = client.schema.get_shards("Test").await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_update_class_shard_ok() {
let shard = test_shard();
let shard_str = serde_json::to_string(&shard).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_put(
&mut mock_server,
"/v1/schema/Test/shards/abcd",
200,
&shard_str,
);
let res = client
.schema
.update_class_shard("Test", "abcd", ShardStatus::READONLY)
.await;
mock.assert();
assert!(res.is_ok());
assert_eq!(shard.name, res.unwrap().name);
}
#[tokio::test]
async fn test_update_class_shard_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_put(&mut mock_server, "/v1/schema/Test/shards/abcd", 401, "");
let res = client
.schema
.update_class_shard("Test", "abcd", ShardStatus::READONLY)
.await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_list_tenants_ok() {
let tenants = test_tenants();
let tenants_str = serde_json::to_string(&tenants.tenants).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(
&mut mock_server,
"/v1/schema/Test/tenants",
200,
&tenants_str,
);
let res = client.schema.list_tenants("Test").await;
mock.assert();
assert!(res.is_ok());
assert_eq!(tenants.tenants[0].name, res.unwrap().tenants[0].name);
}
#[tokio::test]
async fn test_list_tenants_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_get(&mut mock_server, "/v1/schema/Test/tenants", 422, "");
let res = client.schema.list_tenants("Test").await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_add_tenants_ok() {
let tenants = test_tenants();
let tenants_str = serde_json::to_string(&tenants.tenants).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_post(
&mut mock_server,
"/v1/schema/Test/tenants",
200,
&tenants_str,
);
let res = client.schema.add_tenants("Test", &tenants).await;
mock.assert();
assert!(res.is_ok());
assert_eq!(tenants.tenants[0].name, res.unwrap().tenants[0].name);
}
#[tokio::test]
async fn test_add_tenants_err() {
let tenants = test_tenants();
let (mut mock_server, client) = get_test_harness();
let mock = mock_post(&mut mock_server, "/v1/schema/Test/tenants", 422, "");
let res = client.schema.add_tenants("Test", &tenants).await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_remove_tenants_ok() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_delete(&mut mock_server, "/v1/schema/Test/tenants", 200);
let res = client
.schema
.remove_tenants("Test", &vec!["TestTenant"])
.await;
mock.assert();
assert!(res.is_ok());
assert!(res.unwrap());
}
#[tokio::test]
async fn test_remove_tenants_err() {
let (mut mock_server, client) = get_test_harness();
let mock = mock_delete(&mut mock_server, "/v1/schema/Test/tenants", 422);
let res = client
.schema
.remove_tenants("Test", &vec!["TestTenant"])
.await;
mock.assert();
assert!(res.is_err());
}
#[tokio::test]
async fn test_update_tenants_ok() {
let tenants = test_tenants();
let tenants_str = serde_json::to_string(&tenants.tenants).unwrap();
let (mut mock_server, client) = get_test_harness();
let mock = mock_put(
&mut mock_server,
"/v1/schema/Test/tenants",
200,
&tenants_str,
);
let res = client.schema.update_tenants("Test", &tenants).await;
mock.assert();
assert!(res.is_ok());
assert_eq!(tenants.tenants[0].name, res.unwrap().tenants[0].name);
}
#[tokio::test]
async fn test_update_tenants_err() {
let tenants = test_tenants();
let (mut mock_server, client) = get_test_harness();
let mock = mock_put(&mut mock_server, "/v1/schema/Test/tenants", 422, "");
let res = client.schema.update_tenants("Test", &tenants).await;
mock.assert();
assert!(res.is_err());
}
}