docs_account_and_basins/
docs_account_and_basins.rs1use futures::StreamExt;
6use s2_sdk::{
7 S2,
8 types::{
9 AccessTokenScopeInput, BasinMatcher, BasinName, CreateBasinInput, CreateStreamInput,
10 DeleteBasinInput, DeleteStreamInput, IssueAccessTokenInput, ListAllStreamsInput,
11 ListBasinsInput, ListStreamsInput, OperationGroupPermissions, ReadWritePermissions,
12 S2Config, StreamMatcher,
13 },
14};
15
16#[tokio::main]
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(token))?;
22
23 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 basin
52 .create_stream(CreateStreamInput::new("user-actions".parse()?))
53 .await?;
54
55 let config = basin.get_stream_config("user-actions".parse()?).await?;
57
58 basin
60 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
61 .await?;
62 println!("Streams: {:?}, config: {:?}", streams, config);
64
65 let tokens = client.list_access_tokens(Default::default()).await?;
68
69 let result = client
71 .issue_access_token(
72 IssueAccessTokenInput::new(
73 "user-1234-rw-token".parse()?,
74 AccessTokenScopeInput::from_op_group_perms(
75 OperationGroupPermissions::new()
76 .with_stream(ReadWritePermissions::read_write()),
77 )
78 .with_basins(BasinMatcher::Prefix("".parse()?)) .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
80 )
81 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
82 )
83 .await?;
84
85 client
87 .revoke_access_token("user-1234-rw-token".parse()?)
88 .await?;
89 println!("Tokens: {:?}, issued: {:?}", tokens, result);
91
92 client
94 .issue_access_token(IssueAccessTokenInput::new(
95 "restricted-token".parse()?,
96 AccessTokenScopeInput::from_op_group_perms(
97 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
98 )
99 .with_basins(BasinMatcher::Exact("production".parse()?))
100 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
101 ))
102 .await?;
103 if false {
107 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
110 while let Some(info) = stream.next().await {
111 let info = info?;
112 println!("{}", info.name);
113 }
114 }
116
117 Ok(())
118}