use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::io::Read;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use crate::{
VAR_KAFKA_BOOTSTRAP_SERVERS, VAR_KAFKA_CONSUMER_GROUP_TYPE, VAR_LOCAL_DATASTREAMS_JSON,
VAR_SCHEMA_REGISTRY_HOST, utils,
};
#[doc(inline)]
pub use error::DatastreamError;
mod error;
const FILE_NAME: &str = "local_datastreams.json";
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Datastream {
brokers: Vec<String>,
streams: HashMap<String, Stream>,
private_consumer_groups: Vec<String>,
shared_consumer_groups: Vec<String>,
non_enveloped_streams: Vec<String>,
schema_store: String,
}
impl Datastream {
pub fn get_brokers(&self) -> Vec<&str> {
self.brokers.iter().map(|s| s.as_str()).collect()
}
pub fn get_brokers_string(&self) -> String {
self.brokers.join(", ")
}
pub fn get_group_id(&self, group_type: GroupType) -> Result<&str, DatastreamError> {
let group_id = match group_type {
GroupType::Private(i) => self.private_consumer_groups.get(i),
GroupType::Shared(i) => self.shared_consumer_groups.get(i),
};
match group_id {
Some(id) => Ok(id),
None => Err(DatastreamError::IndexGroupIdError(group_type)),
}
}
pub fn streams(&self) -> &HashMap<String, Stream> {
&self.streams
}
pub fn get_stream(&self, topic: &str) -> Option<&Stream> {
let topic_name = topic.split('.').take(2).collect::<Vec<&str>>().join(".");
self.streams().get(&topic_name)
}
pub fn verify_list_of_topics<T: std::fmt::Display>(
&self,
topics: &Vec<T>,
access: ReadWriteAccess,
) -> Result<(), DatastreamError> {
let read_topics = self
.streams()
.values()
.map(|datastream| match access {
ReadWriteAccess::Read => datastream
.read
.split('.')
.take(2)
.collect::<Vec<&str>>()
.join(".")
.replace('\\', ""),
ReadWriteAccess::Write => datastream
.write
.split('.')
.take(2)
.collect::<Vec<&str>>()
.join(".")
.replace('\\', ""),
})
.collect::<Vec<String>>();
for topic in topics {
let topic_name = topic
.to_string()
.split('.')
.take(2)
.collect::<Vec<&str>>()
.join(".");
if !read_topics.contains(&topic_name) {
return Err(DatastreamError::NotFoundTopicError(topic.to_string()));
}
}
Ok(())
}
pub fn schema_store(&self) -> &str {
&self.schema_store
}
pub fn to_file(&self, path: &std::path::Path) -> Result<(), DatastreamError> {
let json_string = serde_json::to_string_pretty(self)?;
std::fs::write(path.join("datastreams.json"), json_string)?;
info!("File created ({})", path.display());
Ok(())
}
pub async fn fetch(
client: &reqwest::Client,
host: &str,
tenant: &str,
task_id: &str,
) -> Result<Self, DatastreamError> {
let url = Self::datastreams_endpoint(host, tenant, task_id);
let response = client.get(&url).send().await?;
if !response.status().is_success() {
return Err(DatastreamError::DshCallError {
url,
status_code: response.status(),
error_body: response.text().await.unwrap_or_default(),
});
}
Ok(response.json().await?)
}
pub fn fetch_blocking(
client: &reqwest::blocking::Client,
host: &str,
tenant: &str,
task_id: &str,
) -> Result<Self, DatastreamError> {
let url = Self::datastreams_endpoint(host, tenant, task_id);
let response = client.get(&url).send()?;
if !response.status().is_success() {
return Err(DatastreamError::DshCallError {
url,
status_code: response.status(),
error_body: response.text().unwrap_or_default(),
});
}
Ok(response.json()?)
}
pub(crate) fn datastreams_endpoint(host: &str, tenant: &str, task_id: &str) -> String {
format!("{}/kafka/config/{}/{}", host, tenant, task_id)
}
pub(crate) fn load_local_datastreams() -> Result<Self, DatastreamError> {
let path_buf = if let Ok(path) = utils::get_env_var(VAR_LOCAL_DATASTREAMS_JSON) {
let path = std::path::PathBuf::from(path);
if !path.exists() {
panic!("{} not found", path.display());
} else {
path
}
} else {
std::env::current_dir().unwrap().join(FILE_NAME)
};
debug!("Reading local datastreams from {}", path_buf.display());
let mut file = File::open(&path_buf).map_err(|e| {
debug!(
"Failed to open local_datastreams.json ({}): {}",
path_buf.display(),
e
);
DatastreamError::IoError(e)
})?;
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
let mut datastream: Datastream = serde_json::from_str(&contents)
.unwrap_or_else(|e| panic!("Failed to parse {}: {:?}", path_buf.display(), e));
if let Ok(brokers) = utils::get_env_var(VAR_KAFKA_BOOTSTRAP_SERVERS) {
datastream.brokers = brokers.split(',').map(|s| s.to_string()).collect();
}
if let Ok(schema_store) = utils::get_env_var(VAR_SCHEMA_REGISTRY_HOST) {
datastream.schema_store = schema_store;
}
Ok(datastream)
}
}
impl Default for Datastream {
fn default() -> Self {
let group_id = format!(
"{}_default_group",
utils::tenant_name().unwrap_or("local".to_string())
);
let brokers = if let Ok(brokers) = utils::get_env_var(VAR_KAFKA_BOOTSTRAP_SERVERS) {
brokers.split(',').map(|s| s.to_string()).collect()
} else {
vec!["localhost:9092".to_string()]
};
let schema_store = utils::get_env_var(VAR_SCHEMA_REGISTRY_HOST)
.unwrap_or_else(|_| "http://localhost:8081/apis/ccompat/v7".to_string());
Datastream {
brokers,
streams: HashMap::new(),
private_consumer_groups: vec![group_id.clone()],
shared_consumer_groups: vec![group_id],
non_enveloped_streams: Vec::new(),
schema_store,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Stream {
name: String,
cluster: String,
read: String,
write: String,
partitions: i32,
replication: i32,
partitioner: String,
partitioning_depth: i32,
can_retain: bool,
}
impl Stream {
pub fn name(&self) -> &str {
&self.name
}
pub fn cluster(&self) -> &str {
&self.cluster
}
pub fn read(&self) -> &str {
&self.read
}
pub fn write(&self) -> &str {
&self.write
}
pub fn partitions(&self) -> i32 {
self.partitions
}
pub fn replication(&self) -> i32 {
self.replication
}
pub fn partitioner(&self) -> &str {
&self.partitioner
}
pub fn partitioning_depth(&self) -> i32 {
self.partitioning_depth
}
pub fn can_retain(&self) -> bool {
self.can_retain
}
pub fn read_access(&self) -> bool {
!self.read.is_empty()
}
pub fn write_access(&self) -> bool {
!self.write.is_empty()
}
pub fn read_pattern(&self) -> Result<&str, DatastreamError> {
if self.read_access() {
Ok(&self.read)
} else {
Err(DatastreamError::TopicPermissionsError(
self.name.clone(),
ReadWriteAccess::Read,
))
}
}
pub fn write_pattern(&self) -> Result<&str, DatastreamError> {
if self.write_access() {
Ok(&self.write)
} else {
Err(DatastreamError::TopicPermissionsError(
self.name.clone(),
ReadWriteAccess::Write,
))
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ReadWriteAccess {
Read,
Write,
}
#[derive(Debug, PartialEq)]
pub enum GroupType {
Private(usize),
Shared(usize),
}
impl GroupType {
pub fn from_env() -> Self {
let group_type = env::var(VAR_KAFKA_CONSUMER_GROUP_TYPE);
match group_type {
Ok(s) if s.eq_ignore_ascii_case("private") => GroupType::Private(0),
Ok(s) if s.eq_ignore_ascii_case("shared") => GroupType::Shared(0),
Ok(_) => {
error!(
"KAFKA_CONSUMER_GROUP_TYPE is not set to \"shared\" or \"private\". Defaulting to shared group type."
);
GroupType::Shared(0)
}
Err(_) => {
debug!("KAFKA_CONSUMER_GROUP_TYPE is not set, defaulting to shared group type.");
GroupType::Shared(0)
}
}
}
}
impl std::fmt::Display for GroupType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GroupType::Private(i) => write!(f, "private; index: {i}"),
GroupType::Shared(i) => write!(f, "shared; index: {i}"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
fn datastream() -> Datastream {
serde_json::from_str(datastreams_json().as_str()).unwrap()
}
fn datastreams_json() -> String {
std::fs::File::open("test_resources/valid_datastreams.json")
.map(|mut file| {
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
contents
})
.unwrap()
}
#[test]
fn test_name() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.name(), "scratch.test");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.name(), "stream.test");
}
#[test]
fn test_read() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.read(), "scratch.test.test-tenant");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.read(), "stream\\.test\\.[^.]*");
}
#[test]
fn test_write() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.write(), "scratch.test.test-tenant");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.write(), "");
}
#[test]
fn test_cluster() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.cluster(), "/tt");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.cluster(), "/tt");
}
#[test]
fn test_partitions() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.partitions(), 3);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.partitions(), 1);
}
#[test]
fn test_replication() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.replication(), 1);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.replication(), 1);
}
#[test]
fn test_partitioner() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.partitioner(), "default-partitioner");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.partitioner(), "default-partitioner");
}
#[test]
fn test_partitioning_depth() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.partitioning_depth(), 0);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.partitioning_depth(), 0);
}
#[test]
fn test_can_retain() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.can_retain(), false);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.can_retain(), true);
}
#[test]
fn test_datastream_get_brokers() {
assert_eq!(
datastream().get_brokers(),
vec![
"broker-0.tt.kafka.mesos:9091",
"broker-1.tt.kafka.mesos:9091",
"broker-2.tt.kafka.mesos:9091"
]
);
}
#[test]
fn test_datastream_get_brokers_string() {
assert_eq!(
datastream().get_brokers_string(),
"broker-0.tt.kafka.mesos:9091, broker-1.tt.kafka.mesos:9091, broker-2.tt.kafka.mesos:9091"
);
}
#[test]
fn test_datastream_verify_list_of_topics() {
let topics = vec![
"scratch.test.test-tenant".to_string(),
"stream.test.test-tenant".to_string(),
];
datastream()
.verify_list_of_topics(&topics, ReadWriteAccess::Read)
.unwrap()
}
#[test]
fn test_datastream_get_schema_store() {
assert_eq!(
datastream().schema_store(),
"http://schema-registry.tt.kafka.mesos:8081"
);
}
#[test]
#[serial(env_dependency)]
fn test_datastream_get_group_type_from_env() {
unsafe {
env::set_var(VAR_KAFKA_CONSUMER_GROUP_TYPE, "private");
assert_eq!(GroupType::from_env(), GroupType::Private(0),);
env::set_var(VAR_KAFKA_CONSUMER_GROUP_TYPE, "shared");
assert_eq!(GroupType::from_env(), GroupType::Shared(0),);
env::set_var(VAR_KAFKA_CONSUMER_GROUP_TYPE, "invalid-type");
assert_eq!(GroupType::from_env(), GroupType::Shared(0),);
env::remove_var(VAR_KAFKA_CONSUMER_GROUP_TYPE);
assert_eq!(GroupType::from_env(), GroupType::Shared(0),);
}
}
#[test]
fn test_datastream_get_group_id() {
assert_eq!(
datastream().get_group_id(GroupType::Private(0)).unwrap(),
"test-app.7e93a513-6556-11eb-841e-f6ab8576620c_1",
"KAFKA_CONSUMER_GROUP_TYPE is set to private, but did not return test-app.7e93a513-6556-11eb-841e-f6ab8576620c_1"
);
assert_eq!(
datastream().get_group_id(GroupType::Shared(0)).unwrap(),
"test-app_1",
"KAFKA_CONSUMER_GROUP_TYPE is set to shared, but did not return test-app_1"
);
assert_eq!(
datastream().get_group_id(GroupType::Shared(3)).unwrap(),
"test-app_4",
"KAFKA_CONSUMER_GROUP_TYPE is set to shared, but did not return test-app_1"
);
assert!(datastream().get_group_id(GroupType::Private(1000)).is_err(),);
}
#[test]
fn test_datastream_check_access_read_topic() {
assert_eq!(
datastream()
.get_stream("scratch.test.test-tenant")
.unwrap()
.read_access(),
true
);
assert_eq!(
datastream()
.get_stream("stream.test.test-tenant")
.unwrap()
.read_access(),
true
);
}
#[test]
fn test_datastream_check_access_write_topic() {
assert_eq!(
datastream()
.get_stream("scratch.test.test-tenant")
.unwrap()
.write_access(),
true
);
assert_eq!(
datastream()
.get_stream("stream.test.test-tenant")
.unwrap()
.write_access(),
false
);
}
#[test]
fn test_datastream_check_read_topic() {
assert_eq!(
datastream()
.get_stream("scratch.test.test-tenant")
.unwrap()
.read_pattern()
.unwrap(),
"scratch.test.test-tenant"
);
assert_eq!(
datastream()
.get_stream("stream.test.test-tenant")
.unwrap()
.read_pattern()
.unwrap(),
"stream\\.test\\.[^.]*"
);
}
#[test]
fn test_datastream_check_write_topic() {
assert_eq!(
datastream()
.get_stream("scratch.test.test-tenant")
.unwrap()
.write_pattern()
.unwrap(),
"scratch.test.test-tenant"
);
let e = datastream()
.get_stream("stream.test.test-tenant")
.unwrap()
.write_pattern()
.unwrap_err();
assert!(matches!(
e,
DatastreamError::TopicPermissionsError(_, ReadWriteAccess::Write)
));
}
#[test]
fn test_to_file() {
let test_path = std::path::PathBuf::from("test_files");
let result = datastream().to_file(&test_path);
assert!(result.is_ok())
}
#[test]
#[serial(env_dependency)]
fn test_load_local_valid_datastreams() {
unsafe {
let datastream = Datastream::load_local_datastreams().is_ok();
assert!(datastream);
let current_dir = env::current_dir().unwrap();
let file_location = format!(
"{}/test_resources/valid_datastreams.json",
current_dir.display()
);
println!("file_location: {}", file_location);
env::set_var(VAR_LOCAL_DATASTREAMS_JSON, file_location);
let datastream = Datastream::load_local_datastreams().is_ok();
assert!(datastream);
env::remove_var(VAR_LOCAL_DATASTREAMS_JSON);
}
}
#[test]
#[serial(env_dependency)]
fn test_load_local_nonexisting_datastreams() {
unsafe {
let current_dir = env::current_dir().unwrap();
let file_location = format!(
"{}/test_resoources/nonexisting_datastreams.json",
current_dir.display()
);
env::set_var(VAR_LOCAL_DATASTREAMS_JSON, file_location);
let join_handle = std::thread::spawn(move || {
let _ = Datastream::load_local_datastreams();
});
let result = join_handle.join();
assert!(result.is_err());
env::remove_var(VAR_LOCAL_DATASTREAMS_JSON);
}
}
#[test]
#[serial(env_dependency)]
fn test_load_local_invalid_datastreams() {
unsafe {
let current_dir = env::current_dir().unwrap();
let file_location = format!(
"{}/test_resources/invalid_datastreams.json",
current_dir.display()
);
env::set_var(VAR_LOCAL_DATASTREAMS_JSON, file_location);
let join_handle = std::thread::spawn(move || {
let _ = Datastream::load_local_datastreams();
});
let result = join_handle.join();
assert!(result.is_err());
env::remove_var(VAR_LOCAL_DATASTREAMS_JSON);
}
}
#[test]
#[serial(env_dependency)]
fn test_load_local_invalid_json() {
unsafe {
let current_dir = env::current_dir().unwrap();
let file_location = format!(
"{}/test_resources/invalid_datastreams_missing_field.json",
current_dir.display()
);
env::set_var(VAR_LOCAL_DATASTREAMS_JSON, file_location);
let join_handle = std::thread::spawn(move || {
let _ = Datastream::load_local_datastreams();
});
let result = join_handle.join();
assert!(result.is_err());
env::remove_var(VAR_LOCAL_DATASTREAMS_JSON);
}
}
#[test]
fn test_datastream_endpoint() {
let host = "http://localhost:8080";
let tenant = "test-tenant";
let task_id = "test-task-id";
let endpoint = Datastream::datastreams_endpoint(host, tenant, task_id);
assert_eq!(
endpoint,
"http://localhost:8080/kafka/config/test-tenant/test-task-id"
);
}
#[tokio::test]
async fn test_fetch() {
let mut dsh = mockito::Server::new_async().await;
let tenant = "test-tenant";
let task_id = "test-task-id";
let host = dsh.url();
dsh.mock("GET", "/kafka/config/test-tenant/test-task-id")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(datastreams_json())
.create();
let client = reqwest::Client::new();
let fetched_datastream = Datastream::fetch(&client, &host, tenant, task_id)
.await
.unwrap();
assert_eq!(fetched_datastream, datastream());
}
#[test]
fn test_fetch_blocking() {
let mut dsh = mockito::Server::new();
let tenant = "test-tenant";
let task_id = "test-task-id";
let host = dsh.url();
dsh.mock("GET", "/kafka/config/test-tenant/test-task-id")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(datastreams_json())
.create();
let client = reqwest::blocking::Client::new();
let fetched_datastream =
Datastream::fetch_blocking(&client, &host, tenant, task_id).unwrap();
assert_eq!(fetched_datastream, datastream());
}
}