s2_api/v1/
access.rs

1use s2_common::types::{
2    self,
3    access::{AccessTokenId, AccessTokenIdPrefix},
4    basin::{BasinName, BasinNamePrefix},
5    stream::{StreamName, StreamNamePrefix},
6};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone)]
10#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
11pub enum MaybeEmpty<T> {
12    Empty,
13    NonEmpty(T),
14}
15
16impl<T: Serialize> Serialize for MaybeEmpty<T> {
17    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
18    where
19        S: serde::Serializer,
20    {
21        match self {
22            Self::NonEmpty(v) => v.serialize(serializer),
23            Self::Empty => serializer.serialize_str(""),
24        }
25    }
26}
27
28impl<'de, T> Deserialize<'de> for MaybeEmpty<T>
29where
30    T: Deserialize<'de>,
31{
32    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
33    where
34        D: serde::Deserializer<'de>,
35    {
36        let s = String::deserialize(deserializer)?;
37        if s.is_empty() {
38            Ok(MaybeEmpty::Empty)
39        } else {
40            T::deserialize(serde::de::value::StringDeserializer::new(s)).map(MaybeEmpty::NonEmpty)
41        }
42    }
43}
44
45use time::OffsetDateTime;
46
47#[rustfmt::skip]
48#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
49#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
50#[serde(rename_all = "kebab-case")]
51pub enum Operation {
52    /// List basins.
53    ListBasins,
54    /// Create a basin.
55    CreateBasin,
56    /// Delete a basin.
57    DeleteBasin,
58    /// Reconfigure a basin.
59    ReconfigureBasin,
60    /// Get basin configuration.
61    GetBasinConfig,
62    /// Issue an access token.
63    IssueAccessToken,
64    /// Revoke an access token.
65    RevokeAccessToken,
66    /// List access tokens.
67    ListAccessTokens,
68    /// List streams.
69    ListStreams,
70    /// Create a stream.
71    CreateStream,
72    /// Delete a stream.
73    DeleteStream,
74    /// Get stream configuration.
75    GetStreamConfig,
76    /// Reconfigure a stream.
77    ReconfigureStream,
78    /// Check the tail of a stream.
79    CheckTail,
80    /// Append records to a stream.
81    Append,
82    /// Read records from a stream.
83    Read,
84    /// Trim records on a stream.
85    Trim,
86    /// Set the fencing token on a stream.
87    Fence,
88    /// Retrieve account-level metrics.
89    AccountMetrics,
90    /// Retrieve basin-level metrics.
91    BasinMetrics,
92    /// Retrieve stream-level metrics.
93    StreamMetrics,
94}
95
96impl From<Operation> for types::access::Operation {
97    fn from(value: Operation) -> Self {
98        match value {
99            Operation::ListBasins => Self::ListBasins,
100            Operation::CreateBasin => Self::CreateBasin,
101            Operation::DeleteBasin => Self::DeleteBasin,
102            Operation::ReconfigureBasin => Self::ReconfigureBasin,
103            Operation::GetBasinConfig => Self::GetBasinConfig,
104            Operation::IssueAccessToken => Self::IssueAccessToken,
105            Operation::RevokeAccessToken => Self::RevokeAccessToken,
106            Operation::ListAccessTokens => Self::ListAccessTokens,
107            Operation::ListStreams => Self::ListStreams,
108            Operation::CreateStream => Self::CreateStream,
109            Operation::DeleteStream => Self::DeleteStream,
110            Operation::GetStreamConfig => Self::GetStreamConfig,
111            Operation::ReconfigureStream => Self::ReconfigureStream,
112            Operation::CheckTail => Self::CheckTail,
113            Operation::Append => Self::Append,
114            Operation::Read => Self::Read,
115            Operation::Trim => Self::Trim,
116            Operation::Fence => Self::Fence,
117            Operation::AccountMetrics => Self::AccountMetrics,
118            Operation::BasinMetrics => Self::BasinMetrics,
119            Operation::StreamMetrics => Self::StreamMetrics,
120        }
121    }
122}
123
124impl From<types::access::Operation> for Operation {
125    fn from(value: types::access::Operation) -> Self {
126        use types::access::Operation::*;
127        match value {
128            ListBasins => Self::ListBasins,
129            CreateBasin => Self::CreateBasin,
130            DeleteBasin => Self::DeleteBasin,
131            ReconfigureBasin => Self::ReconfigureBasin,
132            GetBasinConfig => Self::GetBasinConfig,
133            IssueAccessToken => Self::IssueAccessToken,
134            RevokeAccessToken => Self::RevokeAccessToken,
135            ListAccessTokens => Self::ListAccessTokens,
136            ListStreams => Self::ListStreams,
137            CreateStream => Self::CreateStream,
138            DeleteStream => Self::DeleteStream,
139            GetStreamConfig => Self::GetStreamConfig,
140            ReconfigureStream => Self::ReconfigureStream,
141            CheckTail => Self::CheckTail,
142            Append => Self::Append,
143            Read => Self::Read,
144            Trim => Self::Trim,
145            Fence => Self::Fence,
146            AccountMetrics => Self::AccountMetrics,
147            BasinMetrics => Self::BasinMetrics,
148            StreamMetrics => Self::StreamMetrics,
149        }
150    }
151}
152
153#[rustfmt::skip]
154#[derive(Debug, Clone, Serialize, Deserialize)]
155#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
156pub struct AccessTokenInfo {
157    /// Access token ID.
158    /// It must be unique to the account and between 1 and 96 bytes in length.
159    pub id: types::access::AccessTokenId,
160    /// Expiration time in RFC 3339 format.
161    /// If not set, the expiration will be set to that of the requestor's token.
162    #[serde(default, with = "time::serde::rfc3339::option")]
163    pub expires_at: Option<OffsetDateTime>,
164    /// Namespace streams based on the configured stream-level scope, which must be a prefix.
165    /// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
166    #[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
167    pub auto_prefix_streams: Option<bool>,
168    /// Access token scope.
169    pub scope: AccessTokenScope,
170}
171
172impl TryFrom<AccessTokenInfo> for types::access::IssueAccessTokenRequest {
173    type Error = types::ValidationError;
174
175    fn try_from(value: AccessTokenInfo) -> Result<Self, Self::Error> {
176        Ok(Self {
177            id: value.id,
178            expires_at: value.expires_at,
179            auto_prefix_streams: value.auto_prefix_streams.unwrap_or_default(),
180            scope: value.scope.try_into()?,
181        })
182    }
183}
184
185impl From<types::access::AccessTokenInfo> for AccessTokenInfo {
186    fn from(value: types::access::AccessTokenInfo) -> Self {
187        Self {
188            id: value.id,
189            expires_at: Some(value.expires_at),
190            auto_prefix_streams: Some(value.auto_prefix_streams),
191            scope: value.scope.into(),
192        }
193    }
194}
195
196impl From<types::access::IssueAccessTokenRequest> for AccessTokenInfo {
197    fn from(value: types::access::IssueAccessTokenRequest) -> Self {
198        Self {
199            id: value.id,
200            expires_at: value.expires_at,
201            auto_prefix_streams: Some(value.auto_prefix_streams),
202            scope: value.scope.into(),
203        }
204    }
205}
206
207#[rustfmt::skip]
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
210pub struct AccessTokenScope {
211    /// Basin names allowed.
212    pub basins: Option<ResourceSet<MaybeEmpty<BasinName>, BasinNamePrefix>>,
213    /// Stream names allowed.
214    pub streams: Option<ResourceSet<MaybeEmpty<StreamName>, StreamNamePrefix>>,
215    /// Token IDs allowed.
216    pub access_tokens:  Option<ResourceSet<MaybeEmpty<AccessTokenId>, AccessTokenIdPrefix>>,
217    /// Access permissions at operation group level.
218    pub op_groups: Option<PermittedOperationGroups>,
219    /// Operations allowed for the token.
220    /// A union of allowed operations and groups is used as an effective set of allowed operations.
221    #[cfg_attr(feature = "utoipa", schema(required = false))]
222    pub ops: Option<Vec<Operation>>,
223}
224
225impl TryFrom<AccessTokenScope> for types::access::AccessTokenScope {
226    type Error = types::ValidationError;
227
228    fn try_from(value: AccessTokenScope) -> Result<Self, Self::Error> {
229        let AccessTokenScope {
230            basins,
231            streams,
232            access_tokens,
233            op_groups,
234            ops,
235        } = value;
236
237        Ok(Self {
238            basins: basins.map(Into::into).unwrap_or_default(),
239            streams: streams.map(Into::into).unwrap_or_default(),
240            access_tokens: access_tokens.map(Into::into).unwrap_or_default(),
241            op_groups: op_groups.map(Into::into).unwrap_or_default(),
242            ops: ops
243                .map(|o| o.into_iter().map(types::access::Operation::from).collect())
244                .unwrap_or_default(),
245        })
246    }
247}
248
249impl From<types::access::AccessTokenScope> for AccessTokenScope {
250    fn from(value: types::access::AccessTokenScope) -> Self {
251        let types::access::AccessTokenScope {
252            basins,
253            streams,
254            access_tokens,
255            op_groups,
256            ops,
257        } = value;
258
259        Self {
260            basins: ResourceSet::to_opt(basins),
261            streams: ResourceSet::to_opt(streams),
262            access_tokens: ResourceSet::to_opt(access_tokens),
263            op_groups: Some(op_groups.into()),
264            ops: Some(ops.into_iter().map(Operation::from).collect()),
265        }
266    }
267}
268
269#[rustfmt::skip]
270#[derive(Debug, Clone, Serialize, Deserialize)]
271#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
272#[serde(rename_all = "kebab-case")]
273pub enum ResourceSet<E, P> {
274    /// Match only the resource with this exact name.
275    /// Use an empty string to match no resources.
276    #[cfg_attr(feature = "utoipa", schema(title = "exact", value_type = String))]
277    Exact(E),
278    /// Match all resources that start with this prefix.
279    /// Use an empty string to match all resource.
280    #[cfg_attr(feature = "utoipa", schema(title = "prefix", value_type = String))]
281    Prefix(P),
282}
283
284impl<E, P> ResourceSet<MaybeEmpty<E>, P> {
285    pub fn to_opt(rs: types::access::ResourceSet<E, P>) -> Option<Self> {
286        match rs {
287            types::access::ResourceSet::None => None,
288            types::access::ResourceSet::Exact(e) => {
289                Some(ResourceSet::Exact(MaybeEmpty::NonEmpty(e)))
290            }
291            types::access::ResourceSet::Prefix(p) => Some(ResourceSet::Prefix(p)),
292        }
293    }
294}
295
296impl<E, P> From<ResourceSet<MaybeEmpty<E>, P>> for types::access::ResourceSet<E, P> {
297    fn from(value: ResourceSet<MaybeEmpty<E>, P>) -> Self {
298        match value {
299            ResourceSet::Exact(MaybeEmpty::Empty) => Self::None,
300            ResourceSet::Exact(MaybeEmpty::NonEmpty(e)) => Self::Exact(e),
301            ResourceSet::Prefix(p) => Self::Prefix(p),
302        }
303    }
304}
305
306#[rustfmt::skip]
307#[derive(Debug, Clone, Serialize, Deserialize)]
308#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
309pub struct PermittedOperationGroups {
310    /// Account-level access permissions.
311    pub account: Option<ReadWritePermissions>,
312    /// Basin-level access permissions.
313    pub basin: Option<ReadWritePermissions>,
314    /// Stream-level access permissions.
315    pub stream: Option<ReadWritePermissions>,
316}
317
318impl From<PermittedOperationGroups> for types::access::PermittedOperationGroups {
319    fn from(value: PermittedOperationGroups) -> Self {
320        let PermittedOperationGroups {
321            account,
322            basin,
323            stream,
324        } = value;
325
326        Self {
327            account: account.map(Into::into).unwrap_or_default(),
328            basin: basin.map(Into::into).unwrap_or_default(),
329            stream: stream.map(Into::into).unwrap_or_default(),
330        }
331    }
332}
333
334impl From<types::access::PermittedOperationGroups> for PermittedOperationGroups {
335    fn from(value: types::access::PermittedOperationGroups) -> Self {
336        let types::access::PermittedOperationGroups {
337            account,
338            basin,
339            stream,
340        } = value;
341
342        Self {
343            account: Some(account.into()),
344            basin: Some(basin.into()),
345            stream: Some(stream.into()),
346        }
347    }
348}
349
350#[rustfmt::skip]
351#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
352#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
353pub struct ReadWritePermissions {
354    /// Read permission.
355    #[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
356    pub read: Option<bool>,
357    /// Write permission.
358    #[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
359    pub write: Option<bool>,
360}
361
362impl From<ReadWritePermissions> for types::access::ReadWritePermissions {
363    fn from(value: ReadWritePermissions) -> Self {
364        let ReadWritePermissions { read, write } = value;
365
366        Self {
367            read: read.unwrap_or_default(),
368            write: write.unwrap_or_default(),
369        }
370    }
371}
372
373impl From<types::access::ReadWritePermissions> for ReadWritePermissions {
374    fn from(value: types::access::ReadWritePermissions) -> Self {
375        let types::access::ReadWritePermissions { read, write } = value;
376
377        Self {
378            read: Some(read),
379            write: Some(write),
380        }
381    }
382}
383
384#[rustfmt::skip]
385#[derive(Debug, Clone, Serialize, Deserialize)]
386#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
387#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
388pub struct ListAccessTokensRequest {
389    /// Filter to access tokens whose ID begins with this prefix.
390    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
391    pub prefix: Option<types::access::AccessTokenIdPrefix>,
392    /// Filter to access tokens whose ID lexicographically starts after this string.
393    #[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
394    pub start_after: Option<types::access::AccessTokenIdStartAfter>,
395    /// Number of results, up to a maximum of 1000.
396    #[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
397    pub limit: Option<usize>,
398}
399
400super::impl_list_request_conversions!(
401    ListAccessTokensRequest,
402    types::access::AccessTokenIdPrefix,
403    types::access::AccessTokenIdStartAfter
404);
405
406#[rustfmt::skip]
407#[derive(Debug, Clone, Serialize, Deserialize)]
408#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
409pub struct ListAccessTokensResponse {
410    /// Matching access tokens.
411    #[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
412    pub access_tokens: Vec<AccessTokenInfo>,
413    /// Indicates that there are more access tokens that match the criteria.
414    pub has_more: bool,
415}
416
417#[rustfmt::skip]
418#[derive(Debug, Clone, Serialize, Deserialize)]
419#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
420pub struct IssueAccessTokenResponse {
421    /// Created access token.
422    pub access_token: String,
423}
424
425#[cfg(test)]
426mod tests {
427    use proptest::prelude::*;
428
429    use super::*;
430
431    fn random_basin_resource_set() -> impl Strategy<Value = serde_json::Value> {
432        prop_oneof![
433            Just(serde_json::json!({"exact": ""})),
434            "[a-z][a-z0-9]{7,20}".prop_map(|s| serde_json::json!({"exact": s})),
435            Just(serde_json::json!({"prefix": ""})),
436            "[a-z][a-z0-9]{0,10}".prop_map(|s| serde_json::json!({"prefix": s})),
437        ]
438    }
439
440    fn random_resource_set() -> impl Strategy<Value = serde_json::Value> {
441        prop_oneof![
442            Just(serde_json::json!({"exact": ""})),
443            "[a-z][a-z0-9]{0,20}".prop_map(|s| serde_json::json!({"exact": s})),
444            Just(serde_json::json!({"prefix": ""})),
445            "[a-z][a-z0-9]{0,10}".prop_map(|s| serde_json::json!({"prefix": s})),
446        ]
447    }
448
449    fn random_access_token_info() -> impl Strategy<Value = serde_json::Value> {
450        (
451            "[a-z][a-z0-9]{0,20}",
452            proptest::option::of(random_basin_resource_set()),
453            proptest::option::of(random_resource_set()),
454            proptest::option::of(random_resource_set()),
455        )
456            .prop_map(|(id, basins, streams, access_tokens)| {
457                serde_json::json!({
458                    "id": id,
459                    "scope": {
460                        "basins": basins,
461                        "streams": streams,
462                        "access_tokens": access_tokens
463                    }
464                })
465            })
466    }
467
468    proptest! {
469        #[test]
470        fn access_token_info_roundtrip(json in random_access_token_info()) {
471            let parsed: AccessTokenInfo = serde_json::from_value(json).unwrap();
472            let internal: types::access::IssueAccessTokenRequest = parsed.clone().try_into().unwrap();
473            let back: AccessTokenInfo = internal.into();
474            prop_assert_eq!(parsed.id, back.id);
475        }
476    }
477
478    #[test]
479    fn empty_exact_converts_to_resource_set_none() {
480        let json = serde_json::json!({
481            "id": "test-token",
482            "scope": {
483                "streams": {"exact": ""},
484                "basins": {"exact": ""},
485                "access_tokens": {"exact": ""}
486            }
487        });
488
489        let parsed: AccessTokenInfo = serde_json::from_value(json).unwrap();
490        let internal: types::access::IssueAccessTokenRequest = parsed.try_into().unwrap();
491
492        assert!(matches!(
493            internal.scope.streams,
494            types::access::ResourceSet::None
495        ));
496        assert!(matches!(
497            internal.scope.basins,
498            types::access::ResourceSet::None
499        ));
500        assert!(matches!(
501            internal.scope.access_tokens,
502            types::access::ResourceSet::None
503        ));
504    }
505
506    #[test]
507    fn missing_scope_fields_default_to_resource_set_none() {
508        let json = serde_json::json!({
509            "id": "test-token",
510            "scope": {}
511        });
512
513        let parsed: AccessTokenInfo = serde_json::from_value(json).unwrap();
514        let internal: types::access::IssueAccessTokenRequest = parsed.try_into().unwrap();
515
516        assert!(matches!(
517            internal.scope.streams,
518            types::access::ResourceSet::None
519        ));
520        assert!(matches!(
521            internal.scope.basins,
522            types::access::ResourceSet::None
523        ));
524        assert!(matches!(
525            internal.scope.access_tokens,
526            types::access::ResourceSet::None
527        ));
528    }
529}