dsh_api/stream.rs
1//! # Additional methods to manage streams
2//!
3//! _These functions are only available when the `manage` feature is enabled._
4//!
5//! Module that contains methods and functions to manage streams.
6//!
7//! # Generated methods
8//!
9//! [`DshApiClient`] methods that are generated from the `openapi` specification.
10//!
11//! * [`delete_stream_internal_access_read(streamid, tenant)`](DshApiClient::delete_stream_internal_access_read)
12//! * [`delete_stream_internal_access_write(streamid, tenant)`](DshApiClient::delete_stream_internal_access_write)
13//! * [`delete_stream_internal_configuration(streamid)`](DshApiClient::delete_stream_internal_configuration)
14//! * [`delete_stream_public_access_read(streamid, tenant)`](DshApiClient::delete_stream_public_access_read)
15//! * [`delete_stream_public_access_write(streamid, tenant)`](DshApiClient::delete_stream_public_access_write)
16//! * [`delete_stream_public_configuration(streamid)`](DshApiClient::delete_stream_public_configuration)
17//! * [`get_stream_internal_access_reads(streamid) -> [id]`](DshApiClient::get_stream_internal_access_reads)
18//! * [`get_stream_internal_access_writes(streamid) -> [id]`](DshApiClient::get_stream_internal_access_writes)
19//! * [`get_stream_internal_configuration(streamid) -> ManagedStream`](DshApiClient::get_stream_internal_configuration)
20//! * [`get_stream_internals() -> [ManagedStreamId]`](DshApiClient::get_stream_internals)
21//! * [`get_stream_public_access_reads(streamid) -> [id]`](DshApiClient::get_stream_public_access_reads)
22//! * [`get_stream_public_access_writes(streamid) -> [id]`](DshApiClient::get_stream_public_access_writes)
23//! * [`get_stream_public_configuration(streamid) -> PublicManagedStream`](DshApiClient::get_stream_public_configuration)
24//! * [`get_stream_publics() -> [ManagedStreamId]`](DshApiClient::get_stream_publics)
25//! * [`head_stream_internal_access_read(streamid, tenant)`](DshApiClient::head_stream_internal_access_read)
26//! * [`head_stream_internal_access_write(streamid, tenant)`](DshApiClient::head_stream_internal_access_write)
27//! * [`head_stream_public_access_read(streamid, tenant)`](DshApiClient::head_stream_public_access_read)
28//! * [`head_stream_public_access_write(streamid, tenant)`](DshApiClient::head_stream_public_access_write)
29//! * [`post_stream_internal_configuration(streamid, body)`](DshApiClient::post_stream_internal_configuration)
30//! * [`post_stream_public_configuration(streamid, body)`](DshApiClient::post_stream_public_configuration)
31//! * [`put_stream_internal_access_read(streamid, tenant)`](DshApiClient::put_stream_internal_access_read)
32//! * [`put_stream_internal_access_write(streamid, tenant)`](DshApiClient::put_stream_internal_access_write)
33//! * [`put_stream_public_access_read(streamid, tenant)`](DshApiClient::put_stream_public_access_read)
34//! * [`put_stream_public_access_write(streamid, tenant)`](DshApiClient::put_stream_public_access_write)
35//!
36//! # Derived methods
37//!
38//! [`DshApiClient`] methods that add extra capabilities but do not directly call the
39//! DSH resource management API. These derived methods depend on the API methods for this.
40//!
41//! * [`managed_stream_access_rights(stream id, tenant name) -> rights`](DshApiClient::managed_stream_access_rights)
42//! * [`managed_stream_configuration(stream id) -> stream`](DshApiClient::managed_stream_configuration)
43//! * [`managed_stream_configurations() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations)
44//! * [`managed_stream_configurations_internal() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_internal)
45//! * [`managed_stream_configurations_public() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_public)
46//! * [`managed_stream_grant_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_grant_access_rights)
47//! * [`managed_stream_revoke_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_revoke_access_rights)
48//! * [`managed_stream_tenants_with_access_rights(stream id) -> [(tenant name, rights)]`](DshApiClient::managed_stream_tenants_with_access_rights)
49
50use crate::dsh_api_client::DshApiClient;
51use crate::types::{ManagedStream, ManagedStreamId, PublicManagedStream};
52use crate::{AccessRights, DshApiError, DshApiResult};
53use futures::future::try_join_all;
54use futures::{join, try_join};
55use itertools::Itertools;
56use serde::{Deserialize, Serialize};
57use std::fmt::{Debug, Display, Formatter};
58
59/// Managed stream, either internal or public.
60#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
61pub enum Stream {
62 Internal { internal_stream: ManagedStream },
63 Public { public_stream: PublicManagedStream },
64}
65
66impl Stream {
67 pub(crate) fn internal<T>(internal_stream: T) -> Self
68 where
69 T: Into<ManagedStream>,
70 {
71 Self::Internal { internal_stream: internal_stream.into() }
72 }
73
74 pub(crate) fn public<T>(public_stream: T) -> Self
75 where
76 T: Into<PublicManagedStream>,
77 {
78 Self::Public { public_stream: public_stream.into() }
79 }
80}
81
82impl Display for Stream {
83 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84 match self {
85 Stream::Internal { internal_stream } => Display::fmt(&internal_stream, f),
86 Stream::Public { public_stream } => Display::fmt(public_stream, f),
87 }
88 }
89}
90
91/// # Additional methods and functions to manage streams
92///
93/// _These functions are only available when the `manage` feature is enabled._
94///
95/// Module that contains methods and functions to manage internal and public streams.
96/// * Derived methods - DshApiClient methods that add extra capabilities
97/// but depend on the API methods.
98/// * Functions - Functions that add extra capabilities but do not depend directly on the API.
99///
100/// # Derived methods
101///
102/// [`DshApiClient`] methods that add extra capabilities but do not directly call the
103/// DSH resource management API. These derived methods depend on the API methods for this.
104///
105/// * [`managed_stream_access_rights(stream id, tenant name) -> rights`](DshApiClient::managed_stream_access_rights)
106/// * [`managed_stream_configuration(stream id) -> stream`](DshApiClient::managed_stream_configuration)
107/// * [`managed_stream_configurations() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations)
108/// * [`managed_stream_configurations_internal() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_internal)
109/// * [`managed_stream_configurations_public() -> [(stream id, stream)]`](DshApiClient::managed_stream_configurations_public)
110/// * [`managed_stream_grant_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_grant_access_rights)
111/// * [`managed_stream_revoke_access_rights(stream id, tenant name, rights) -> stream`](DshApiClient::managed_stream_revoke_access_rights)
112/// * [`managed_stream_tenants_with_access_rights(stream id) -> [(tenant name, rights)]`](DshApiClient::managed_stream_tenants_with_access_rights)
113impl DshApiClient {
114 /// Check whether a managed tenant has access to a managed stream.
115 ///
116 /// Note that this method will return `Ok(None)` when either the managed tenant
117 /// or the managed stream does not exist.
118 ///
119 /// # Parameters
120 /// * `stream_id` - managed stream id
121 /// * `tenant_id` - managed tenant id
122 ///
123 /// # Returns
124 /// * `Ok(Some(AccessRights::Read))` - when the managed tenant has read access to the managed stream
125 /// * `Ok(Some(AccessRights::ReadWrite))` - when the managed tenant has both read and write access to the managed stream
126 /// * `Ok(Some(AccessRights::Write))` - when the managed tenant has write access to the managed stream
127 /// * `Ok(None)` - when the managed tenant does not have access to the managed stream,
128 /// or when the managed stream or the managed tenant does not exist
129 /// * `Err<DshApiError>` - when the request could not be processed by the DSH
130 pub async fn managed_stream_access_rights(&self, stream_id: &ManagedStreamId, tenant_id: &str) -> DshApiResult<Option<AccessRights>> {
131 let (internal_read_access, internal_write_access, public_read_access, public_write_access) = try_join!(
132 self.managed_tenant_has_internal_read_access(tenant_id, stream_id),
133 self.managed_tenant_has_internal_write_access(tenant_id, stream_id),
134 self.managed_tenant_has_public_read_access(tenant_id, stream_id),
135 self.managed_tenant_has_public_write_access(tenant_id, stream_id)
136 )?;
137 match (internal_read_access, internal_write_access) {
138 (false, false) => match (public_read_access, public_write_access) {
139 (false, false) => Ok(None),
140 (false, true) => Ok(Some(AccessRights::Write)),
141 (true, false) => Ok(Some(AccessRights::Read)),
142 (true, true) => Ok(Some(AccessRights::ReadWrite)),
143 },
144 (false, true) => Ok(Some(AccessRights::Write)),
145 (true, false) => Ok(Some(AccessRights::Read)),
146 (true, true) => Ok(Some(AccessRights::ReadWrite)),
147 }
148 }
149
150 /// Get internal or public managed stream configuration.
151 ///
152 /// # Parameters
153 /// * `managed_stream_id` - managed stream identifier
154 ///
155 /// # Returns
156 /// * `Ok<Stream::Internal>` - when request was successful for internal managed stream
157 /// * `Ok<Stream::Public>` - when request was successful for public managed stream
158 /// * `Ok<None>` - when internal and public managed stream with the provided id do not exist
159 /// * `Err<DshApiError>` - when the request could not be processed by the DSH
160 pub async fn managed_stream_configuration(&self, managed_stream_id: &ManagedStreamId) -> DshApiResult<Option<Stream>> {
161 let r = join!(
162 self.get_stream_internal_configuration(managed_stream_id.as_str()),
163 self.get_stream_public_configuration(managed_stream_id.as_str())
164 );
165 match r {
166 (Err(internal_stream_error), Err(public_stream_error)) => match (internal_stream_error, public_stream_error) {
167 (DshApiError::NotFound { .. }, DshApiError::NotFound { .. }) => Ok(None),
168 (internal_stream_error, DshApiError::NotFound { .. }) => Err(internal_stream_error),
169 (DshApiError::NotFound { .. }, public_stream_error) => Err(public_stream_error),
170 (internal_stream_error, _) => Err(internal_stream_error),
171 },
172 (Ok(internal_stream), Err(public_stream_error)) => match public_stream_error {
173 DshApiError::NotFound { .. } => Ok(Some(Stream::Internal { internal_stream })),
174 error => Err(error),
175 },
176 (Err(internal_stream_error), Ok(public_stream)) => match internal_stream_error {
177 DshApiError::NotFound { .. } => Ok(Some(Stream::Public { public_stream })),
178 error => Err(error),
179 },
180 (Ok(_), Ok(_)) => Err(DshApiError::Unexpected { message: format!("both internal and public managed stream '{}' exist", managed_stream_id), cause: None }),
181 }
182 }
183
184 /// Get managed stream configurations.
185 ///
186 /// Returns a list of (stream id, stream)-tuples containing the ids and configurations
187 /// of the available internal or public managed streams.
188 /// When there are no managed streams, an empty list will be returned.
189 /// The list will be sorted by stream id.
190 ///
191 /// # Returns
192 /// * `Ok<Vec<(ManagedStreamId, Stream)>>` - when request was successful
193 /// * `Err<DshApiError>` - when the request could not be processed by the DSH
194 pub async fn managed_stream_configurations(&self) -> DshApiResult<Vec<(ManagedStreamId, Stream)>> {
195 let (internal_ids, public_ids) = try_join!(self.get_stream_internals(), self.get_stream_publics())?;
196 let (internal_streams, public_streams) = try_join!(
197 try_join_all(
198 internal_ids
199 .iter()
200 .map(|managed_stream_id| self.get_stream_internal_configuration(managed_stream_id.as_str()))
201 ),
202 try_join_all(
203 public_ids
204 .iter()
205 .map(|managed_stream_id| self.get_stream_public_configuration(managed_stream_id.as_str()))
206 ),
207 )?;
208 let mut tuples: Vec<(ManagedStreamId, Stream)> = internal_ids
209 .into_iter()
210 .zip(internal_streams.into_iter().map(Stream::internal).collect_vec())
211 .collect_vec();
212 tuples.append(
213 &mut public_ids
214 .into_iter()
215 .zip(public_streams.into_iter().map(Stream::public).collect_vec())
216 .collect_vec(),
217 );
218 tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
219 Ok(tuples)
220 }
221
222 /// Get internal managed stream configurations.
223 ///
224 /// Returns a list of (stream id, stream)-tuples containing the ids and configurations
225 /// of the available internal managed streams.
226 /// When there are no internal managed streams, an empty list will be returned.
227 /// The list will be sorted by stream id.
228 ///
229 /// # Returns
230 /// * `Ok<Vec<(ManagedStreamId, ManagedStream)>>` - when request was successful
231 /// * `Err<DshApiError>` - when the request could not be processed by the DSH
232 pub async fn managed_stream_configurations_internal(&self) -> DshApiResult<Vec<(ManagedStreamId, ManagedStream)>> {
233 let internal_stream_ids = self.get_stream_internals().await?;
234 let internal_streams = try_join_all(
235 internal_stream_ids
236 .iter()
237 .map(|stream_id| self.get_stream_internal_configuration(stream_id.as_str())),
238 )
239 .await?;
240 let mut tuples = internal_stream_ids.into_iter().zip(internal_streams).collect_vec();
241 tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
242 Ok(tuples)
243 }
244
245 /// #Get public managed stream configurations.
246 ///
247 /// Returns a list of (stream id, stream)-tuples containing the ids and configurations
248 /// of the available public managed streams.
249 /// When there are no public managed streams, an empty list will be returned.
250 /// The list will be sorted by stream id.
251 ///
252 /// # Returns
253 /// * `Ok<Vec<(ManagedStreamId, PublicManagedStream)>>` - when request was successful
254 /// * `Err<DshApiError>` - when the request could not be processed by the DSH
255 pub async fn managed_stream_configurations_public(&self) -> DshApiResult<Vec<(ManagedStreamId, PublicManagedStream)>> {
256 let public_stream_ids = self.get_stream_publics().await?;
257 let public_streams = try_join_all(public_stream_ids.iter().map(|stream_id| self.get_stream_public_configuration(stream_id.as_str()))).await?;
258 let mut tuples = public_stream_ids.into_iter().zip(public_streams).collect_vec();
259 tuples.sort_by(|(stream_id_a, _), (stream_id_b, _)| stream_id_a.cmp(stream_id_b));
260 Ok(tuples)
261 }
262
263 /// Grant managed stream access rights to managed tenant.
264 ///
265 /// # Parameters
266 /// * `managed_stream_id` - internal or public managed stream to grant access rights to
267 /// * `managed_tenant_id` - managed tenant which is granted access rights
268 /// * `access_rights` - read, read/write or write access rights
269 ///
270 /// # Returns
271 /// * `Ok<Stream>` - when request was successfully made the internal or public managed stream
272 /// will be returned
273 /// * `Err<DshApiError>` - when the managed stream does not exist or the request
274 /// could not be processed by the DSH
275 pub async fn managed_stream_grant_access_rights(&self, managed_stream_id: &ManagedStreamId, managed_tenant_id: &str, access_rights: &AccessRights) -> DshApiResult<Stream> {
276 match self.managed_stream_configuration(managed_stream_id).await? {
277 Some(Stream::Internal { internal_stream }) => {
278 match access_rights {
279 AccessRights::Read => self.put_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
280 AccessRights::ReadWrite => {
281 try_join!(
282 self.put_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id),
283 self.put_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id),
284 )?;
285 }
286 AccessRights::Write => self.put_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
287 }
288 Ok(Stream::Internal { internal_stream })
289 }
290 Some(Stream::Public { public_stream }) => {
291 match access_rights {
292 AccessRights::Read => self.put_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
293 AccessRights::ReadWrite => {
294 try_join!(
295 self.put_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id),
296 self.put_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id),
297 )?;
298 }
299 AccessRights::Write => self.put_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
300 }
301 Ok(Stream::Public { public_stream })
302 }
303 None => Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
304 }
305 }
306
307 /// #Revoke managed stream access rights from managed tenant.
308 ///
309 /// # Parameters
310 /// * `managed_stream_id` - internal or public managed stream to revoke access rights from
311 /// * `managed_tenant_id` - managed tenant from which access rights are revoked
312 /// * `access_rights` - read, read/write or write access rights
313 ///
314 /// # Returns
315 /// * `Ok<Stream>` - when request was successfully made the internal or public managed stream
316 /// will be returned
317 /// * `Err<DshApiError>` - when the managed stream does not exist or the request
318 /// could not be processed by the DSH
319 pub async fn managed_stream_revoke_access_rights(&self, managed_stream_id: &ManagedStreamId, managed_tenant_id: &str, access_rights: &AccessRights) -> DshApiResult<Stream> {
320 match self.managed_stream_configuration(managed_stream_id).await? {
321 Some(Stream::Internal { internal_stream }) => {
322 match access_rights {
323 AccessRights::Read => self.delete_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
324 AccessRights::ReadWrite => {
325 try_join!(
326 self.delete_stream_internal_access_read(managed_stream_id.as_str(), managed_tenant_id),
327 self.delete_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id),
328 )?;
329 }
330 AccessRights::Write => self.delete_stream_internal_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
331 }
332 Ok(Stream::Internal { internal_stream })
333 }
334 Some(Stream::Public { public_stream }) => {
335 match access_rights {
336 AccessRights::Read => self.delete_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id).await?,
337 AccessRights::ReadWrite => {
338 try_join!(
339 self.delete_stream_public_access_read(managed_stream_id.as_str(), managed_tenant_id),
340 self.delete_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id),
341 )?;
342 }
343 AccessRights::Write => self.delete_stream_public_access_write(managed_stream_id.as_str(), managed_tenant_id).await?,
344 }
345 Ok(Stream::Public { public_stream })
346 }
347 None => Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
348 }
349 }
350
351 /// Get tenants that have been granted access rights.
352 ///
353 /// # Parameters
354 /// * `managed_stream_id` - internal or public managed stream to get granted rights for
355 ///
356 /// # Returns
357 /// * `Ok<Vec<(String, AccessRights)>` - tuples consisting of tenant ids and granted access rights
358 /// * `Err<DshApiError>` - when the managed stream does not exist or the request
359 /// could not be processed by the DSH
360 pub async fn managed_stream_tenants_with_access_rights(&self, managed_stream_id: &ManagedStreamId) -> DshApiResult<Vec<(String, AccessRights)>> {
361 let (tenant_ids_reads, tenant_ids_writes) = match self.managed_stream_configuration(managed_stream_id).await? {
362 Some(stream_configuration) => match stream_configuration {
363 Stream::Internal { .. } => try_join!(
364 self.get_stream_internal_access_reads(managed_stream_id.as_str()),
365 self.get_stream_internal_access_writes(managed_stream_id.as_str())
366 )?,
367 Stream::Public { .. } => try_join!(
368 self.get_stream_public_access_reads(managed_stream_id.as_str()),
369 self.get_stream_public_access_writes(managed_stream_id.as_str())
370 )?,
371 },
372 None => return Err(DshApiError::NotFound { message: Some(format!("managed stream '{}' does not exist", managed_stream_id)) }),
373 };
374 let mut tenant_ids = tenant_ids_reads.iter().collect_vec();
375 for id in &tenant_ids_writes {
376 tenant_ids.push(id);
377 }
378 tenant_ids.sort();
379 tenant_ids.dedup();
380 Ok(
381 tenant_ids
382 .into_iter()
383 .filter_map(|tenant_id| {
384 AccessRights::from(tenant_ids_reads.contains(tenant_id), tenant_ids_writes.contains(tenant_id)).map(|acess_rights| (tenant_id.clone(), acess_rights))
385 })
386 .collect_vec(),
387 )
388 }
389}