Skip to main content

docs_account_and_basins/
docs_account_and_basins.rs

1//! Documentation examples for Account and Basins page.
2//!
3//! Run with: cargo run --example docs_account_and_basins
4
5use futures::StreamExt;
6use s2_sdk::{
7    S2,
8    types::{
9        AccessTokenScopeInput, BasinMatcher, BasinName, CreateBasinInput, CreateStreamInput,
10        DeleteBasinInput, DeleteStreamInput, IssueAccessTokenInput, ListAllStreamsInput,
11        ListBasinsInput, ListStreamsInput, OperationGroupPermissions, ReadWritePermissions,
12        S2Config, StreamMatcher,
13    },
14};
15
16#[tokio::main]
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18    let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19    let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21    let client = S2::new(S2Config::new(access_token))?;
22
23    // ANCHOR: basin-operations
24    // List basins
25    let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27    // Create a basin
28    client
29        .create_basin(CreateBasinInput::new("my-events".parse()?))
30        .await?;
31
32    // Get configuration
33    let config = client.get_basin_config("my-events".parse()?).await?;
34
35    // Delete
36    client
37        .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38        .await?;
39    // ANCHOR_END: basin-operations
40    println!("Basins: {:?}, config: {:?}", basins, config);
41
42    let basin = client.basin(basin_name);
43
44    // ANCHOR: stream-operations
45    // List streams
46    let streams = basin
47        .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48        .await?;
49
50    // Create a stream
51    // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52    basin
53        .create_stream(CreateStreamInput::new("user-actions".parse()?))
54        .await?;
55
56    // Get configuration
57    let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59    // Delete
60    basin
61        .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62        .await?;
63    // ANCHOR_END: stream-operations
64    println!("Streams: {:?}, config: {:?}", streams, config);
65
66    // ANCHOR: access-token-basic
67    // List tokens (returns metadata, not the secret)
68    let tokens = client.list_access_tokens(Default::default()).await?;
69
70    // Issue a token scoped to streams under "users/1234/"
71    let result = client
72        .issue_access_token(
73            IssueAccessTokenInput::new(
74                "user-1234-rw-token".parse()?,
75                AccessTokenScopeInput::from_op_group_perms(
76                    OperationGroupPermissions::new()
77                        .with_stream(ReadWritePermissions::read_write()),
78                )
79                .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80                .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81            )
82            .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83        )
84        .await?;
85
86    // Revoke a token
87    client
88        .revoke_access_token("user-1234-rw-token".parse()?)
89        .await?;
90    // ANCHOR_END: access-token-basic
91    println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93    // ANCHOR: access-token-restricted
94    client
95        .issue_access_token(IssueAccessTokenInput::new(
96            "restricted-token".parse()?,
97            AccessTokenScopeInput::from_op_group_perms(
98                OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99            )
100            .with_basins(BasinMatcher::Exact("production".parse()?))
101            .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102        ))
103        .await?;
104    // ANCHOR_END: access-token-restricted
105
106    // Pagination examples - not executed by default
107    if false {
108        // ANCHOR: pagination
109        // Iterate through all streams with automatic pagination
110        let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111        while let Some(info) = stream.next().await {
112            let info = info?;
113            println!("{}", info.name);
114        }
115        // ANCHOR_END: pagination
116
117        // ANCHOR: pagination-filtering
118        // List streams with a prefix filter
119        let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120        let mut stream = basin.list_all_streams(input);
121        while let Some(info) = stream.next().await {
122            println!("{}", info?.name);
123        }
124        // ANCHOR_END: pagination-filtering
125
126        // ANCHOR: pagination-deleted
127        // Include streams that are being deleted
128        let input = ListAllStreamsInput::new().with_include_deleted(true);
129        let mut stream = basin.list_all_streams(input);
130        while let Some(info) = stream.next().await {
131            let info = info?;
132            println!("{} {:?}", info.name, info.deleted_at);
133        }
134        // ANCHOR_END: pagination-deleted
135    }
136
137    Ok(())
138}