Skip to main content

dsh_api/
stream.rs

1//! # Additional methods to manage streams
2//!
3//! _These functions are only available when the `manage` feature is enabled._
4//!
5//! Module that contains methods and functions to manage streams.
6//!
7//! # Generated methods
8//!
9//! [`DshApiClient`] methods that are generated from the `openapi` specification.
10//!
11//! * [`delete_stream_internal_access_read(streamid, tenant)`](DshApiClient::delete_stream_internal_access_read)
12//! * [`delete_stream_internal_access_write(streamid, tenant)`](DshApiClient::delete_stream_internal_access_write)
13//! * [`delete_stream_internal_configuration(streamid)`](DshApiClient::delete_stream_internal_configuration)
14//! * [`delete_stream_public_access_read(streamid, tenant)`](DshApiClient::delete_stream_public_access_read)
15//! * [`delete_stream_public_access_write(streamid, tenant)`](DshApiClient::delete_stream_public_access_write)
16//! * [`delete_stream_public_configuration(streamid)`](DshApiClient::delete_stream_public_configuration)
17//! * [`get_stream_internal_access_reads(streamid) -> [id]`](DshApiClient::get_stream_internal_access_reads)
18//! * [`get_stream_internal_access_writes(streamid) -> [id]`](DshApiClient::get_stream_internal_access_writes)
19//! * [`get_stream_internal_configuration(streamid) -> ManagedStream`](DshApiClient::get_stream_internal_configuration)
20//! * [`get_stream_internals() -> [ManagedStreamId]`](DshApiClient::get_stream_internals)
21//! * [`get_stream_public_access_reads(streamid) -> [id]`](DshApiClient::get_stream_public_access_reads)
22//! * [`get_stream_public_access_writes(streamid) -> [id]`](DshApiClient::get_stream_public_access_writes)
23//! * [`get_stream_public_configuration(streamid) -> PublicManagedStream`](DshApiClient::get_stream_public_configuration)
24//! * [`get_stream_publics() -> [ManagedStreamId]`](DshApiClient::get_stream_publics)
25//! * [`head_stream_internal_access_read(streamid, tenant)`](DshApiClient::head_stream_internal_access_read)
26//! * [`head_stream_internal_access_write(streamid, tenant)`](DshApiClient::head_stream_internal_access_write)
27//! * [`head_stream_public_access_read(streamid, tenant)`](DshApiClient::head_stream_public_access_read)
28//! * [`head_stream_public_access_write(streamid, tenant)`](DshApiClient::head_stream_public_access_write)
29//! * [`post_stream_internal_configuration(streamid, body)`](DshApiClient::post_stream_internal_configuration)
30//! * [`post_stream_public_configuration(streamid, body)`](DshApiClient::post_stream_public_configuration)
31//! * [`put_stream_internal_access_read(streamid, tenant)`](DshApiClient::put_stream_internal_access_read)
32//! * [`put_stream_internal_access_write(streamid, tenant)`](DshApiClient::put_stream_internal_access_write)
33//! * [`put_stream_public_access_read(streamid, tenant)`](DshApiClient::put_stream_public_access_read)
34//! * [`put_stream_public_access_write(streamid, tenant)`](DshApiClient::put_stream_public_access_write)
35//!
36//! # Derived methods
37//!
38//! [`DshApiClient`] methods that add extra capabilities but do not directly call the
39//! DSH resource management API. These derived methods depend on the API methods for this.
40//!
41//! * [`managed_stream_access_rights(stream id, tenant name) -> rights`](DshApiClient::managed_stream_access_rights)
42//! * [`managed_stream_configuration(stream id) -> stream`](DshApiClient::managed_stream_configuration)
43//! * [`managed_stream_configurations() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations)
44//! * [`managed_stream_configurations_internal() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_internal)
45//! * [`managed_stream_configurations_public() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_public)
46//! * [`managed_stream_grant_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_grant_access_rights)
47//! * [`managed_stream_revoke_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_revoke_access_rights)
48//! * [`managed_stream_tenants_with_access_rights(stream id) -> [(tenant name, rights)]`](DshApiClient::managed_stream_tenants_with_access_rights)
49
50use crate::dsh_api_client::DshApiClient;
51use crate::types::{ManagedStream, ManagedStreamId, PublicManagedStream};
52use crate::{AccessRights, DshApiError, DshApiResult};
53use futures::future::try_join_all;
54use futures::{join, try_join};
55use itertools::Itertools;
56use serde::{Deserialize, Serialize};
57use std::fmt::{Debug, Display, Formatter};
58
59/// Managed stream, either internal or public.
60#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
61pub enum Stream {
62  Internal { internal_stream: ManagedStream },
63  Public { public_stream: PublicManagedStream },
64}
65
66impl Stream {
67  pub(crate) fn internal<T>(internal_stream: T) -> Self
68  where
69    T: Into<ManagedStream>,
70  {
71    Self::Internal { internal_stream: internal_stream.into() }
72  }
73
74  pub(crate) fn public<T>(public_stream: T) -> Self
75  where
76    T: Into<PublicManagedStream>,
77  {
78    Self::Public { public_stream: public_stream.into() }
79  }
80}
81
82impl Display for Stream {
83  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84    match self {
85      Stream::Internal { internal_stream } => Display::fmt(&internal_stream, f),
86      Stream::Public { public_stream } => Display::fmt(public_stream, f),
87    }
88  }
89}
90
91/// # Additional methods and functions to manage streams
92///
93/// _These functions are only available when the `manage` feature is enabled._
94///
95/// Module that contains methods and functions to manage internal and public streams.
96/// * Derived methods - DshApiClient methods that add extra capabilities
97///   but depend on the API methods.
98/// * Functions - Functions that add extra capabilities but do not depend directly on the API.
99///
100/// # Derived methods
101///
102/// [`DshApiClient`] methods that add extra capabilities but do not directly call the
103/// DSH resource management API. These derived methods depend on the API methods for this.
104///
105/// * [`managed_stream_access_rights(stream id, tenant name) -> rights`](DshApiClient::managed_stream_access_rights)
106/// * [`managed_stream_configuration(stream id) -> stream`](DshApiClient::managed_stream_configuration)
107/// * [`managed_stream_configurations() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations)
108/// * [`managed_stream_configurations_internal() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_internal)
109/// * [`managed_stream_configurations_public() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_public)
110/// * [`managed_stream_grant_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_grant_access_rights)
111/// * [`managed_stream_revoke_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_revoke_access_rights)
112/// * [`managed_stream_tenants_with_access_rights(stream id) -> [(tenant name, rights)]`](DshApiClient::managed_stream_tenants_with_access_rights)
113impl DshApiClient {
114  /// Check whether a managed tenant has access to a managed stream.
115  ///
116  /// Note that this method will return `Ok(None)` when either the managed tenant
117  /// or the managed stream does not exist.
118  ///
119  /// # Parameters
120  /// * `stream_id` - managed stream id
121  /// * `tenant_id` - managed tenant id
122  ///
123  /// # Returns
124  /// * `Ok(Some(AccessRights::Read))` - when the managed tenant has read access to the managed stream
125  /// * `Ok(Some(AccessRights::ReadWrite))` - when the managed tenant has both read and write access to the managed stream
126  /// * `Ok(Some(AccessRights::Write))` - when the managed tenant has write access to the managed stream
127  /// * `Ok(None)` - when the managed tenant does not have access to the managed stream,
128  ///   or when the managed stream or the managed tenant does not exist
129  /// * `Err<DshApiError>` - when the request could not be processed by the DSH
130  pub async fn managed_stream_access_rights(&self, stream_id: &ManagedStreamId, tenant_id: &str) -> DshApiResult<Option<AccessRights>> {
131    let (internal_read_access, internal_write_access, public_read_access, public_write_access) = try_join!(
132      self.managed_tenant_has_internal_read_access(tenant_id, stream_id),
133      self.managed_tenant_has_internal_write_access(tenant_id, stream_id),
134      self.managed_tenant_has_public_read_access(tenant_id, stream_id),
135      self.managed_tenant_has_public_write_access(tenant_id, stream_id)
136    )?;
137    match (internal_read_access, internal_write_access) {
138      (false, false) => match (public_read_access, public_write_access) {
139        (false, false) => Ok(None),
140        (false, true) => Ok(Some(AccessRights::Write)),
141        (true, false) => Ok(Some(AccessRights::Read)),
142        (true, true) => Ok(Some(AccessRights::ReadWrite)),
143      },
144      (false, true) => Ok(Some(AccessRights::Write)),
145      (true, false) => Ok(Some(AccessRights::Read)),
146      (true, true) => Ok(Some(AccessRights::ReadWrite)),
147    }
148  }
149
150  /// Get internal or public managed stream configuration.
151  ///
152  /// # Parameters
153  /// * `managed_stream_id` - managed stream identifier
154  ///
155  /// # Returns
156  /// * `Ok<Stream::Internal>` - when request was successful for internal managed stream
157  /// * `Ok<Stream::Public>` - when request was successful for public managed stream
158  /// * `Ok<None>` - when internal and public managed stream with the provided id do not exist
159  /// * `Err<DshApiError>` - when the request could not be processed by the DSH
160  pub async fn managed_stream_configuration(&self, managed_stream_id: &ManagedStreamId) -> DshApiResult<Option<Stream>> {
161    let r = join!(
162      self.get_stream_internal_configuration(managed_stream_id.as_str()),
163      self.get_stream_public_configuration(managed_stream_id.as_str())
164    );
165    match r {
166      (Err(internal_stream_error), Err(public_stream_error)) => match (internal_stream_error, public_stream_error) {
167        (DshApiError::NotFound { .. }, DshApiError::NotFound { .. }) => Ok(None),
168        (internal_stream_error, DshApiError::NotFound { .. }) => Err(internal_stream_error),
169        (DshApiError::NotFound { .. }, public_stream_error) => Err(public_stream_error),
170        (internal_stream_error, _) => Err(internal_stream_error),
171      },
172      (Ok(internal_stream), Err(public_stream_error)) => match public_stream_error {
173        DshApiError::NotFound { .. } => Ok(Some(Stream::Internal { internal_stream })),
174        error => Err(error),
175      },
176      (Err(internal_stream_error), Ok(public_stream)) => match internal_stream_error {
177        DshApiError::NotFound { .. } => Ok(Some(Stream::Public { public_stream })),
178        error => Err(error),
179      },
180      (Ok(_), Ok(_)) => Err(DshApiError::Unexpected { message: format!("both internal and public managed stream '{}' exist", managed_stream_id), cause: None }),
181    }
182  }
183
184  /// Get managed stream configurations.
185  ///
186  /// Returns a list of (stream id, stream)-tuples containing the ids and configurations
187  /// of the available internal or public managed streams.
188  /// When there are no managed streams, an empty list will be returned.
189  /// The list will be sorted by stream id.
190  ///
191  /// # Returns
192  /// * `Ok<Vec<(ManagedStreamId, Stream)>>` - when request was successful
193  /// * `Err<DshApiError>` - when the request could not be processed by the DSH
194  pub async fn managed_stream_configurations(&self) -> DshApiResult<Vec<(ManagedStreamId, Stream)>> {
195    let (internal_ids, public_ids) = try_join!(self.get_stream_internals(), self.get_stream_publics())?;
196    let (internal_streams, public_streams) = try_join!(
197      try_join_all(
198        internal_ids
199          .iter()
200          .map(|managed_stream_id| self.get_stream_internal_configuration(managed_stream_id.as_str()))
201      ),
202      try_join_all(
203        public_ids
204          .iter()
205          .map(|managed_stream_id| self.get_stream_public_configuration(managed_stream_id.as_str()))
206      ),
207    )?;
208    let mut tuples: Vec<(ManagedStreamId, Stream)> = internal_ids
209      .into_iter()
210      .zip(internal_streams.into_iter().map(Stream::internal).collect_vec())
211      .collect_vec();
212    tuples.append(
213      &mut public_ids
214        .into_iter()
215        .zip(public_streams.into_iter().map(Stream::public).collect_vec())
216        .collect_vec(),
217    );
218    tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
219    Ok(tuples)
220  }
221
222  /// Get internal managed stream configurations.
223  ///
224  /// Returns a list of (stream id, stream)-tuples containing the ids and configurations
225  /// of the available internal managed streams.
226  /// When there are no internal managed streams, an empty list will be returned.
227  /// The list will be sorted by stream id.
228  ///
229  /// # Returns
230  /// * `Ok<Vec<(ManagedStreamId, ManagedStream)>>` - when request was successful
231  /// * `Err<DshApiError>` - when the request could not be processed by the DSH
232  pub async fn managed_stream_configurations_internal(&self) -> DshApiResult<Vec<(ManagedStreamId, ManagedStream)>> {
233    let internal_stream_ids = self.get_stream_internals().await?;
234    let internal_streams = try_join_all(
235      internal_stream_ids
236        .iter()
237        .map(|stream_id| self.get_stream_internal_configuration(stream_id.as_str())),
238    )
239    .await?;
240    let mut tuples = internal_stream_ids.into_iter().zip(internal_streams).collect_vec();
241    tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
242    Ok(tuples)
243  }
244
245  /// #Get public managed stream configurations.
246  ///
247  /// Returns a list of (stream id, stream)-tuples containing the ids and configurations
248  /// of the available public managed streams.
249  /// When there are no public managed streams, an empty list will be returned.
250  /// The list will be sorted by stream id.
251  ///
252  /// # Returns
253  /// * `Ok<Vec<(ManagedStreamId, PublicManagedStream)>>` - when request was successful
254  /// * `Err<DshApiError>` - when the request could not be processed by the DSH
255  pub async fn managed_stream_configurations_public(&self) -> DshApiResult<Vec<(ManagedStreamId, PublicManagedStream)>> {
256    let public_stream_ids = self.get_stream_publics().await?;
257    let public_streams = try_join_all(public_stream_ids.iter().map(|stream_id| self.get_stream_public_configuration(stream_id.as_str()))).await?;
258    let mut tuples = public_stream_ids.into_iter().zip(public_streams).collect_vec();
259    tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
260    Ok(tuples)
261  }
262
263  /// Grant managed stream access rights to managed tenant.
264  ///
265  /// # Parameters
266  /// * `managed_stream_id` - internal or public managed stream to grant access rights to
267  /// * `managed_tenant_id` - managed tenant which is granted access rights
268  /// * `access_rights` - read, read/write or write access rights
269  ///
270  /// # Returns
271  /// * `Ok<Stream>` - when request was successfully made the internal or public managed stream
272  ///   will be returned
273  /// * `Err<DshApiError>` - when the managed stream does not exist or the request
274  ///   could not be processed by the DSH
275  pub async fn managed_stream_grant_access_rights(&self, managed_stream_id: &ManagedStreamId, managed_tenant_id: &str, access_rights: &AccessRights) -> DshApiResult<Stream> {
276    match self.managed_stream_configuration(managed_stream_id).await? {
277      Some(Stream::Internal { internal_stream }) => {
278        match access_rights {
279          AccessRights::Read => self.put_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
280          AccessRights::ReadWrite => {
281            try_join!(
282              self.put_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id),
283              self.put_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id),
284            )?;
285          }
286          AccessRights::Write => self.put_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
287        }
288        Ok(Stream::Internal { internal_stream })
289      }
290      Some(Stream::Public { public_stream }) => {
291        match access_rights {
292          AccessRights::Read => self.put_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
293          AccessRights::ReadWrite => {
294            try_join!(
295              self.put_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id),
296              self.put_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id),
297            )?;
298          }
299          AccessRights::Write => self.put_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
300        }
301        Ok(Stream::Public { public_stream })
302      }
303      None => Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
304    }
305  }
306
307  /// #Revoke managed stream access rights from managed tenant.
308  ///
309  /// # Parameters
310  /// * `managed_stream_id` - internal or public managed stream to revoke access rights from
311  /// * `managed_tenant_id` - managed tenant from which access rights are revoked
312  /// * `access_rights` - read, read/write or write access rights
313  ///
314  /// # Returns
315  /// * `Ok<Stream>` - when request was successfully made the internal or public managed stream
316  ///   will be returned
317  /// * `Err<DshApiError>` - when the managed stream does not exist or the request
318  ///   could not be processed by the DSH
319  pub async fn managed_stream_revoke_access_rights(&self, managed_stream_id: &ManagedStreamId, managed_tenant_id: &str, access_rights: &AccessRights) -> DshApiResult<Stream> {
320    match self.managed_stream_configuration(managed_stream_id).await? {
321      Some(Stream::Internal { internal_stream }) => {
322        match access_rights {
323          AccessRights::Read => self.delete_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
324          AccessRights::ReadWrite => {
325            try_join!(
326              self.delete_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id),
327              self.delete_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id),
328            )?;
329          }
330          AccessRights::Write => self.delete_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
331        }
332        Ok(Stream::Internal { internal_stream })
333      }
334      Some(Stream::Public { public_stream }) => {
335        match access_rights {
336          AccessRights::Read => self.delete_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
337          AccessRights::ReadWrite => {
338            try_join!(
339              self.delete_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id),
340              self.delete_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id),
341            )?;
342          }
343          AccessRights::Write => self.delete_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
344        }
345        Ok(Stream::Public { public_stream })
346      }
347      None => Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
348    }
349  }
350
351  /// Get tenants that have been granted access rights.
352  ///
353  /// # Parameters
354  /// * `managed_stream_id` - internal or public managed stream to get granted rights for
355  ///
356  /// # Returns
357  /// * `Ok<Vec<(String, AccessRights)>` - tuples consisting of tenant ids and granted access rights
358  /// * `Err<DshApiError>` - when the managed stream does not exist or the request
359  ///   could not be processed by the DSH
360  pub async fn managed_stream_tenants_with_access_rights(&self, managed_stream_id: &ManagedStreamId) -> DshApiResult<Vec<(String, AccessRights)>> {
361    let (tenant_ids_reads, tenant_ids_writes) = match self.managed_stream_configuration(managed_stream_id).await? {
362      Some(stream_configuration) => match stream_configuration {
363        Stream::Internal { .. } => try_join!(
364          self.get_stream_internal_access_reads(managed_stream_id.as_str()),
365          self.get_stream_internal_access_writes(managed_stream_id.as_str())
366        )?,
367        Stream::Public { .. } => try_join!(
368          self.get_stream_public_access_reads(managed_stream_id.as_str()),
369          self.get_stream_public_access_writes(managed_stream_id.as_str())
370        )?,
371      },
372      None => return Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
373    };
374    let mut tenant_ids = tenant_ids_reads.iter().collect_vec();
375    for id in &tenant_ids_writes {
376      tenant_ids.push(id);
377    }
378    tenant_ids.sort();
379    tenant_ids.dedup();
380    Ok(
381      tenant_ids
382        .into_iter()
383        .filter_map(|tenant_id| {
384          AccessRights::from(tenant_ids_reads.contains(tenant_id), tenant_ids_writes.contains(tenant_id)).map(|acess_rights| (tenant_id.clone(), acess_rights))
385        })
386        .collect_vec(),
387    )
388  }
389}