use crate::dsh_api_client::DshApiClient;
use crate::types::{ManagedStream, ManagedStreamId, PublicManagedStream};
use crate::{AccessRights, DshApiError, DshApiResult};
use futures::future::try_join_all;
use futures::{join, try_join};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display, Formatter};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum Stream {
Internal { internal_stream: ManagedStream },
Public { public_stream: PublicManagedStream },
}
impl Stream {
pub(crate) fn internal<T>(internal_stream: T) -> Self
where
T: Into<ManagedStream>,
{
Self::Internal { internal_stream: internal_stream.into() }
}
pub(crate) fn public<T>(public_stream: T) -> Self
where
T: Into<PublicManagedStream>,
{
Self::Public { public_stream: public_stream.into() }
}
}
impl Display for Stream {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Stream::Internal { internal_stream } => Display::fmt(&internal_stream, f),
Stream::Public { public_stream } => Display::fmt(public_stream, f),
}
}
}
impl DshApiClient {
pub async fn managed_stream_access_rights(&self, stream_id: &ManagedStreamId, tenant_id: &str) -> DshApiResult<Option<AccessRights>> {
let (internal_read_access, internal_write_access, public_read_access, public_write_access) = try_join!(
self.managed_tenant_has_internal_read_access(tenant_id, stream_id),
self.managed_tenant_has_internal_write_access(tenant_id, stream_id),
self.managed_tenant_has_public_read_access(tenant_id, stream_id),
self.managed_tenant_has_public_write_access(tenant_id, stream_id)
)?;
match (internal_read_access, internal_write_access) {
(false, false) => match (public_read_access, public_write_access) {
(false, false) => Ok(None),
(false, true) => Ok(Some(AccessRights::Write)),
(true, false) => Ok(Some(AccessRights::Read)),
(true, true) => Ok(Some(AccessRights::ReadWrite)),
},
(false, true) => Ok(Some(AccessRights::Write)),
(true, false) => Ok(Some(AccessRights::Read)),
(true, true) => Ok(Some(AccessRights::ReadWrite)),
}
}
pub async fn managed_stream_configuration(&self, managed_stream_id: &ManagedStreamId) -> DshApiResult<Option<Stream>> {
let r = join!(
self.get_stream_internal_configuration(managed_stream_id.as_str()),
self.get_stream_public_configuration(managed_stream_id.as_str())
);
match r {
(Err(internal_stream_error), Err(public_stream_error)) => match (internal_stream_error, public_stream_error) {
(DshApiError::NotFound { .. }, DshApiError::NotFound { .. }) => Ok(None),
(internal_stream_error, DshApiError::NotFound { .. }) => Err(internal_stream_error),
(DshApiError::NotFound { .. }, public_stream_error) => Err(public_stream_error),
(internal_stream_error, _) => Err(internal_stream_error),
},
(Ok(internal_stream), Err(public_stream_error)) => match public_stream_error {
DshApiError::NotFound { .. } => Ok(Some(Stream::Internal { internal_stream })),
error => Err(error),
},
(Err(internal_stream_error), Ok(public_stream)) => match internal_stream_error {
DshApiError::NotFound { .. } => Ok(Some(Stream::Public { public_stream })),
error => Err(error),
},
(Ok(_), Ok(_)) => Err(DshApiError::Unexpected { message: format!("both internal and public managed stream '{}' exist", managed_stream_id), cause: None }),
}
}
pub async fn managed_stream_configurations(&self) -> DshApiResult<Vec<(ManagedStreamId, Stream)>> {
let (internal_ids, public_ids) = try_join!(self.get_stream_internals(), self.get_stream_publics())?;
let (internal_streams, public_streams) = try_join!(
try_join_all(
internal_ids
.iter()
.map(|managed_stream_id| self.get_stream_internal_configuration(managed_stream_id.as_str()))
),
try_join_all(
public_ids
.iter()
.map(|managed_stream_id| self.get_stream_public_configuration(managed_stream_id.as_str()))
),
)?;
let mut tuples: Vec<(ManagedStreamId, Stream)> = internal_ids
.into_iter()
.zip(internal_streams.into_iter().map(Stream::internal).collect_vec())
.collect_vec();
tuples.append(
&mut public_ids
.into_iter()
.zip(public_streams.into_iter().map(Stream::public).collect_vec())
.collect_vec(),
);
tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
Ok(tuples)
}
pub async fn managed_stream_configurations_internal(&self) -> DshApiResult<Vec<(ManagedStreamId, ManagedStream)>> {
let internal_stream_ids = self.get_stream_internals().await?;
let internal_streams = try_join_all(
internal_stream_ids
.iter()
.map(|stream_id| self.get_stream_internal_configuration(stream_id.as_str())),
)
.await?;
let mut tuples = internal_stream_ids.into_iter().zip(internal_streams).collect_vec();
tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
Ok(tuples)
}
pub async fn managed_stream_configurations_public(&self) -> DshApiResult<Vec<(ManagedStreamId, PublicManagedStream)>> {
let public_stream_ids = self.get_stream_publics().await?;
let public_streams = try_join_all(public_stream_ids.iter().map(|stream_id| self.get_stream_public_configuration(stream_id.as_str()))).await?;
let mut tuples = public_stream_ids.into_iter().zip(public_streams).collect_vec();
tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
Ok(tuples)
}
pub async fn managed_stream_grant_access_rights(&self, managed_stream_id: &ManagedStreamId, managed_tenant_id: &str, access_rights: &AccessRights) -> DshApiResult<Stream> {
match self.managed_stream_configuration(managed_stream_id).await? {
Some(Stream::Internal { internal_stream }) => {
match access_rights {
AccessRights::Read => self.put_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
AccessRights::ReadWrite => {
try_join!(
self.put_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id),
self.put_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id),
)?;
}
AccessRights::Write => self.put_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
}
Ok(Stream::Internal { internal_stream })
}
Some(Stream::Public { public_stream }) => {
match access_rights {
AccessRights::Read => self.put_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
AccessRights::ReadWrite => {
try_join!(
self.put_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id),
self.put_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id),
)?;
}
AccessRights::Write => self.put_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
}
Ok(Stream::Public { public_stream })
}
None => Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
}
}
pub async fn managed_stream_revoke_access_rights(&self, managed_stream_id: &ManagedStreamId, managed_tenant_id: &str, access_rights: &AccessRights) -> DshApiResult<Stream> {
match self.managed_stream_configuration(managed_stream_id).await? {
Some(Stream::Internal { internal_stream }) => {
match access_rights {
AccessRights::Read => self.delete_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
AccessRights::ReadWrite => {
try_join!(
self.delete_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id),
self.delete_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id),
)?;
}
AccessRights::Write => self.delete_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
}
Ok(Stream::Internal { internal_stream })
}
Some(Stream::Public { public_stream }) => {
match access_rights {
AccessRights::Read => self.delete_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
AccessRights::ReadWrite => {
try_join!(
self.delete_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id),
self.delete_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id),
)?;
}
AccessRights::Write => self.delete_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
}
Ok(Stream::Public { public_stream })
}
None => Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
}
}
pub async fn managed_stream_tenants_with_access_rights(&self, managed_stream_id: &ManagedStreamId) -> DshApiResult<Vec<(String, AccessRights)>> {
let (tenant_ids_reads, tenant_ids_writes) = match self.managed_stream_configuration(managed_stream_id).await? {
Some(stream_configuration) => match stream_configuration {
Stream::Internal { .. } => try_join!(
self.get_stream_internal_access_reads(managed_stream_id.as_str()),
self.get_stream_internal_access_writes(managed_stream_id.as_str())
)?,
Stream::Public { .. } => try_join!(
self.get_stream_public_access_reads(managed_stream_id.as_str()),
self.get_stream_public_access_writes(managed_stream_id.as_str())
)?,
},
None => return Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
};
let mut tenant_ids = tenant_ids_reads.iter().collect_vec();
for id in &tenant_ids_writes {
tenant_ids.push(id);
}
tenant_ids.sort();
tenant_ids.dedup();
Ok(
tenant_ids
.into_iter()
.filter_map(|tenant_id| {
AccessRights::from(tenant_ids_reads.contains(tenant_id), tenant_ids_writes.contains(tenant_id)).map(|acess_rights| (tenant_id.clone(), acess_rights))
})
.collect_vec(),
)
}
}