Skip to main content

s2_api/v1/
access.rs

1use s2_common::types::{
2    self,
3    access::{AccessTokenId, AccessTokenIdPrefix},
4    basin::{BasinName, BasinNamePrefix},
5    stream::{StreamName, StreamNamePrefix},
6};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone)]
10#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
11pub enum MaybeEmpty<T> {
12    Empty,
13    NonEmpty(T),
14}
15
16impl<T: Serialize> Serialize for MaybeEmpty<T> {
17    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
18    where
19        S: serde::Serializer,
20    {
21        match self {
22            Self::NonEmpty(v) => v.serialize(serializer),
23            Self::Empty => serializer.serialize_str(""),
24        }
25    }
26}
27
28impl<'de, T> Deserialize<'de> for MaybeEmpty<T>
29where
30    T: Deserialize<'de>,
31{
32    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
33    where
34        D: serde::Deserializer<'de>,
35    {
36        let s = String::deserialize(deserializer)?;
37        if s.is_empty() {
38            Ok(MaybeEmpty::Empty)
39        } else {
40            T::deserialize(serde::de::value::StringDeserializer::new(s)).map(MaybeEmpty::NonEmpty)
41        }
42    }
43}
44
45use time::OffsetDateTime;
46
47#[rustfmt::skip]
48#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
49#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
50#[serde(rename_all = "kebab-case")]
51pub enum Operation {
52    /// List basins.
53    ListBasins,
54    /// Create a basin.
55    CreateBasin,
56    /// Delete a basin.
57    DeleteBasin,
58    /// Reconfigure a basin.
59    ReconfigureBasin,
60    /// Get basin configuration.
61    GetBasinConfig,
62    /// Issue an access token.
63    IssueAccessToken,
64    /// Revoke an access token.
65    RevokeAccessToken,
66    /// List access tokens.
67    ListAccessTokens,
68    /// List streams.
69    ListStreams,
70    /// Create a stream.
71    CreateStream,
72    /// Delete a stream.
73    DeleteStream,
74    /// Get stream configuration.
75    GetStreamConfig,
76    /// Reconfigure a stream.
77    ReconfigureStream,
78    /// Check the tail of a stream.
79    CheckTail,
80    /// Append records to a stream.
81    Append,
82    /// Read records from a stream.
83    Read,
84    /// Trim records on a stream.
85    Trim,
86    /// Set the fencing token on a stream.
87    Fence,
88    /// Retrieve account-level metrics.
89    AccountMetrics,
90    /// Retrieve basin-level metrics.
91    BasinMetrics,
92    /// Retrieve stream-level metrics.
93    StreamMetrics,
94    /// List locations.
95    ListLocations,
96    /// Get the default location.
97    GetDefaultLocation,
98    /// Set the default location.
99    SetDefaultLocation,
100}
101
102impl From<Operation> for types::access::Operation {
103    fn from(value: Operation) -> Self {
104        match value {
105            Operation::ListBasins => Self::ListBasins,
106            Operation::CreateBasin => Self::CreateBasin,
107            Operation::DeleteBasin => Self::DeleteBasin,
108            Operation::ReconfigureBasin => Self::ReconfigureBasin,
109            Operation::GetBasinConfig => Self::GetBasinConfig,
110            Operation::IssueAccessToken => Self::IssueAccessToken,
111            Operation::RevokeAccessToken => Self::RevokeAccessToken,
112            Operation::ListAccessTokens => Self::ListAccessTokens,
113            Operation::ListStreams => Self::ListStreams,
114            Operation::CreateStream => Self::CreateStream,
115            Operation::DeleteStream => Self::DeleteStream,
116            Operation::GetStreamConfig => Self::GetStreamConfig,
117            Operation::ReconfigureStream => Self::ReconfigureStream,
118            Operation::CheckTail => Self::CheckTail,
119            Operation::Append => Self::Append,
120            Operation::Read => Self::Read,
121            Operation::Trim => Self::Trim,
122            Operation::Fence => Self::Fence,
123            Operation::AccountMetrics => Self::AccountMetrics,
124            Operation::BasinMetrics => Self::BasinMetrics,
125            Operation::StreamMetrics => Self::StreamMetrics,
126            Operation::ListLocations => Self::ListLocations,
127            Operation::GetDefaultLocation => Self::GetDefaultLocation,
128            Operation::SetDefaultLocation => Self::SetDefaultLocation,
129        }
130    }
131}
132
133impl From<types::access::Operation> for Operation {
134    fn from(value: types::access::Operation) -> Self {
135        use types::access::Operation::*;
136        match value {
137            ListBasins => Self::ListBasins,
138            CreateBasin => Self::CreateBasin,
139            DeleteBasin => Self::DeleteBasin,
140            ReconfigureBasin => Self::ReconfigureBasin,
141            GetBasinConfig => Self::GetBasinConfig,
142            IssueAccessToken => Self::IssueAccessToken,
143            RevokeAccessToken => Self::RevokeAccessToken,
144            ListAccessTokens => Self::ListAccessTokens,
145            ListStreams => Self::ListStreams,
146            CreateStream => Self::CreateStream,
147            DeleteStream => Self::DeleteStream,
148            GetStreamConfig => Self::GetStreamConfig,
149            ReconfigureStream => Self::ReconfigureStream,
150            CheckTail => Self::CheckTail,
151            Append => Self::Append,
152            Read => Self::Read,
153            Trim => Self::Trim,
154            Fence => Self::Fence,
155            AccountMetrics => Self::AccountMetrics,
156            BasinMetrics => Self::BasinMetrics,
157            StreamMetrics => Self::StreamMetrics,
158            ListLocations => Self::ListLocations,
159            GetDefaultLocation => Self::GetDefaultLocation,
160            SetDefaultLocation => Self::SetDefaultLocation,
161        }
162    }
163}
164
165#[rustfmt::skip]
166#[derive(Debug, Clone, Serialize, Deserialize)]
167#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
168pub struct AccessTokenInfo {
169    /// Access token ID.
170    /// It must be unique to the account and between 1 and 96 bytes in length.
171    pub id: types::access::AccessTokenId,
172    /// Expiration time in RFC 3339 format.
173    /// If not set, the expiration will be set to that of the requestor's token.
174    #[serde(default, with = "time::serde::rfc3339::option")]
175    pub expires_at: Option<OffsetDateTime>,
176    /// Namespace streams based on the configured stream-level scope, which must be a prefix.
177    /// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
178    #[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
179    pub auto_prefix_streams: Option<bool>,
180    /// Access token scope.
181    pub scope: AccessTokenScope,
182}
183
184impl TryFrom<AccessTokenInfo> for types::access::IssueAccessTokenRequest {
185    type Error = types::ValidationError;
186
187    fn try_from(value: AccessTokenInfo) -> Result<Self, Self::Error> {
188        Ok(Self {
189            id: value.id,
190            expires_at: value.expires_at,
191            auto_prefix_streams: value.auto_prefix_streams.unwrap_or_default(),
192            scope: value.scope.try_into()?,
193        })
194    }
195}
196
197impl From<types::access::AccessTokenInfo> for AccessTokenInfo {
198    fn from(value: types::access::AccessTokenInfo) -> Self {
199        Self {
200            id: value.id,
201            expires_at: Some(value.expires_at),
202            auto_prefix_streams: Some(value.auto_prefix_streams),
203            scope: value.scope.into(),
204        }
205    }
206}
207
208impl From<types::access::IssueAccessTokenRequest> for AccessTokenInfo {
209    fn from(value: types::access::IssueAccessTokenRequest) -> Self {
210        Self {
211            id: value.id,
212            expires_at: value.expires_at,
213            auto_prefix_streams: Some(value.auto_prefix_streams),
214            scope: value.scope.into(),
215        }
216    }
217}
218
219#[rustfmt::skip]
220#[derive(Debug, Clone, Serialize, Deserialize)]
221#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
222pub struct AccessTokenScope {
223    /// Basin names allowed.
224    pub basins: Option<ResourceSet<MaybeEmpty<BasinName>, BasinNamePrefix>>,
225    /// Stream names allowed.
226    pub streams: Option<ResourceSet<MaybeEmpty<StreamName>, StreamNamePrefix>>,
227    /// Token IDs allowed.
228    pub access_tokens:  Option<ResourceSet<MaybeEmpty<AccessTokenId>, AccessTokenIdPrefix>>,
229    /// Access permissions at operation group level.
230    pub op_groups: Option<PermittedOperationGroups>,
231    /// Operations allowed for the token.
232    /// A union of allowed operations and groups is used as an effective set of allowed operations.
233    #[cfg_attr(feature = "utoipa", schema(required = false))]
234    pub ops: Option<Vec<Operation>>,
235}
236
237impl TryFrom<AccessTokenScope> for types::access::AccessTokenScope {
238    type Error = types::ValidationError;
239
240    fn try_from(value: AccessTokenScope) -> Result<Self, Self::Error> {
241        let AccessTokenScope {
242            basins,
243            streams,
244            access_tokens,
245            op_groups,
246            ops,
247        } = value;
248
249        Ok(Self {
250            basins: basins.map(Into::into).unwrap_or_default(),
251            streams: streams.map(Into::into).unwrap_or_default(),
252            access_tokens: access_tokens.map(Into::into).unwrap_or_default(),
253            op_groups: op_groups.map(Into::into).unwrap_or_default(),
254            ops: ops
255                .map(|o| o.into_iter().map(types::access::Operation::from).collect())
256                .unwrap_or_default(),
257        })
258    }
259}
260
261impl From<types::access::AccessTokenScope> for AccessTokenScope {
262    fn from(value: types::access::AccessTokenScope) -> Self {
263        let types::access::AccessTokenScope {
264            basins,
265            streams,
266            access_tokens,
267            op_groups,
268            ops,
269        } = value;
270
271        Self {
272            basins: ResourceSet::to_opt(basins),
273            streams: ResourceSet::to_opt(streams),
274            access_tokens: ResourceSet::to_opt(access_tokens),
275            op_groups: Some(op_groups.into()),
276            ops: Some(ops.into_iter().map(Operation::from).collect()),
277        }
278    }
279}
280
281#[rustfmt::skip]
282#[derive(Debug, Clone, Serialize, Deserialize)]
283#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
284#[serde(rename_all = "kebab-case")]
285pub enum ResourceSet<E, P> {
286    /// Match only the resource with this exact name.
287    /// Use an empty string to match no resources.
288    #[cfg_attr(feature = "utoipa", schema(title = "exact", value_type = String))]
289    Exact(E),
290    /// Match all resources that start with this prefix.
291    /// Use an empty string to match all resource.
292    #[cfg_attr(feature = "utoipa", schema(title = "prefix", value_type = String))]
293    Prefix(P),
294}
295
296impl<E, P> ResourceSet<MaybeEmpty<E>, P> {
297    pub fn to_opt(rs: types::access::ResourceSet<E, P>) -> Option<Self> {
298        match rs {
299            types::access::ResourceSet::None => None,
300            types::access::ResourceSet::Exact(e) => {
301                Some(ResourceSet::Exact(MaybeEmpty::NonEmpty(e)))
302            }
303            types::access::ResourceSet::Prefix(p) => Some(ResourceSet::Prefix(p)),
304        }
305    }
306}
307
308impl<E, P> From<ResourceSet<MaybeEmpty<E>, P>> for types::access::ResourceSet<E, P> {
309    fn from(value: ResourceSet<MaybeEmpty<E>, P>) -> Self {
310        match value {
311            ResourceSet::Exact(MaybeEmpty::Empty) => Self::None,
312            ResourceSet::Exact(MaybeEmpty::NonEmpty(e)) => Self::Exact(e),
313            ResourceSet::Prefix(p) => Self::Prefix(p),
314        }
315    }
316}
317
318#[rustfmt::skip]
319#[derive(Debug, Clone, Serialize, Deserialize)]
320#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
321pub struct PermittedOperationGroups {
322    /// Account-level access permissions.
323    pub account: Option<ReadWritePermissions>,
324    /// Basin-level access permissions.
325    pub basin: Option<ReadWritePermissions>,
326    /// Stream-level access permissions.
327    pub stream: Option<ReadWritePermissions>,
328}
329
330impl From<PermittedOperationGroups> for types::access::PermittedOperationGroups {
331    fn from(value: PermittedOperationGroups) -> Self {
332        let PermittedOperationGroups {
333            account,
334            basin,
335            stream,
336        } = value;
337
338        Self {
339            account: account.map(Into::into).unwrap_or_default(),
340            basin: basin.map(Into::into).unwrap_or_default(),
341            stream: stream.map(Into::into).unwrap_or_default(),
342        }
343    }
344}
345
346impl From<types::access::PermittedOperationGroups> for PermittedOperationGroups {
347    fn from(value: types::access::PermittedOperationGroups) -> Self {
348        let types::access::PermittedOperationGroups {
349            account,
350            basin,
351            stream,
352        } = value;
353
354        Self {
355            account: Some(account.into()),
356            basin: Some(basin.into()),
357            stream: Some(stream.into()),
358        }
359    }
360}
361
362#[rustfmt::skip]
363#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
364#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
365pub struct ReadWritePermissions {
366    /// Read permission.
367    #[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
368    pub read: Option<bool>,
369    /// Write permission.
370    #[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
371    pub write: Option<bool>,
372}
373
374impl From<ReadWritePermissions> for types::access::ReadWritePermissions {
375    fn from(value: ReadWritePermissions) -> Self {
376        let ReadWritePermissions { read, write } = value;
377
378        Self {
379            read: read.unwrap_or_default(),
380            write: write.unwrap_or_default(),
381        }
382    }
383}
384
385impl From<types::access::ReadWritePermissions> for ReadWritePermissions {
386    fn from(value: types::access::ReadWritePermissions) -> Self {
387        let types::access::ReadWritePermissions { read, write } = value;
388
389        Self {
390            read: Some(read),
391            write: Some(write),
392        }
393    }
394}
395
396#[rustfmt::skip]
397#[derive(Debug, Clone, Serialize, Deserialize)]
398#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
399#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
400pub struct ListAccessTokensRequest {
401    /// Filter to access tokens whose IDs begin with this prefix.
402    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
403    pub prefix: Option<types::access::AccessTokenIdPrefix>,
404    /// Filter to access tokens whose IDs lexicographically start after this string.
405    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
406    pub start_after: Option<types::access::AccessTokenIdStartAfter>,
407    /// Number of results, up to a maximum of 1000.
408    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
409    pub limit: Option<usize>,
410}
411
412super::impl_list_request_conversions!(
413    ListAccessTokensRequest,
414    types::access::AccessTokenIdPrefix,
415    types::access::AccessTokenIdStartAfter
416);
417
418#[rustfmt::skip]
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
421pub struct ListAccessTokensResponse {
422    /// Matching access tokens.
423    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
424    pub access_tokens: Vec<AccessTokenInfo>,
425    /// Indicates that there are more access tokens that match the criteria.
426    pub has_more: bool,
427}
428
429#[rustfmt::skip]
430#[derive(Debug, Clone, Serialize, Deserialize)]
431#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
432pub struct IssueAccessTokenResponse {
433    /// Created access token.
434    pub access_token: String,
435}
436
437#[cfg(test)]
438mod tests {
439    use proptest::prelude::*;
440
441    use super::*;
442
443    fn random_basin_resource_set() -> impl Strategy<Value = serde_json::Value> {
444        prop_oneof![
445            Just(serde_json::json!({"exact": ""})),
446            "[a-z][a-z0-9]{7,20}".prop_map(|s| serde_json::json!({"exact": s})),
447            Just(serde_json::json!({"prefix": ""})),
448            "[a-z][a-z0-9]{0,10}".prop_map(|s| serde_json::json!({"prefix": s})),
449        ]
450    }
451
452    fn random_resource_set() -> impl Strategy<Value = serde_json::Value> {
453        prop_oneof![
454            Just(serde_json::json!({"exact": ""})),
455            "[a-z][a-z0-9]{0,20}".prop_map(|s| serde_json::json!({"exact": s})),
456            Just(serde_json::json!({"prefix": ""})),
457            "[a-z][a-z0-9]{0,10}".prop_map(|s| serde_json::json!({"prefix": s})),
458        ]
459    }
460
461    fn random_access_token_info() -> impl Strategy<Value = serde_json::Value> {
462        (
463            "[a-z][a-z0-9]{0,20}",
464            proptest::option::of(random_basin_resource_set()),
465            proptest::option::of(random_resource_set()),
466            proptest::option::of(random_resource_set()),
467        )
468            .prop_map(|(id, basins, streams, access_tokens)| {
469                serde_json::json!({
470                    "id": id,
471                    "scope": {
472                        "basins": basins,
473                        "streams": streams,
474                        "access_tokens": access_tokens
475                    }
476                })
477            })
478    }
479
480    proptest! {
481        #[test]
482        fn access_token_info_roundtrip(json in random_access_token_info()) {
483            let parsed: AccessTokenInfo = serde_json::from_value(json).unwrap();
484            let internal: types::access::IssueAccessTokenRequest = parsed.clone().try_into().unwrap();
485            let back: AccessTokenInfo = internal.into();
486            prop_assert_eq!(parsed.id, back.id);
487        }
488    }
489
490    #[test]
491    fn empty_exact_converts_to_resource_set_none() {
492        let json = serde_json::json!({
493            "id": "test-token",
494            "scope": {
495                "streams": {"exact": ""},
496                "basins": {"exact": ""},
497                "access_tokens": {"exact": ""}
498            }
499        });
500
501        let parsed: AccessTokenInfo = serde_json::from_value(json).unwrap();
502        let internal: types::access::IssueAccessTokenRequest = parsed.try_into().unwrap();
503
504        assert!(matches!(
505            internal.scope.streams,
506            types::access::ResourceSet::None
507        ));
508        assert!(matches!(
509            internal.scope.basins,
510            types::access::ResourceSet::None
511        ));
512        assert!(matches!(
513            internal.scope.access_tokens,
514            types::access::ResourceSet::None
515        ));
516    }
517
518    #[test]
519    fn missing_scope_fields_default_to_resource_set_none() {
520        let json = serde_json::json!({
521            "id": "test-token",
522            "scope": {}
523        });
524
525        let parsed: AccessTokenInfo = serde_json::from_value(json).unwrap();
526        let internal: types::access::IssueAccessTokenRequest = parsed.try_into().unwrap();
527
528        assert!(matches!(
529            internal.scope.streams,
530            types::access::ResourceSet::None
531        ));
532        assert!(matches!(
533            internal.scope.basins,
534            types::access::ResourceSet::None
535        ));
536        assert!(matches!(
537            internal.scope.access_tokens,
538            types::access::ResourceSet::None
539        ));
540    }
541}