pub struct S2 { /* private fields */ }Expand description
An S2 account.
Implementations§
Source§impl S2
impl S2
Sourcepub fn new(config: S2Config) -> Result<Self, S2Error>
pub fn new(config: S2Config) -> Result<Self, S2Error>
Create a new S2.
Examples found in repository?
examples/docs_overview.rs (line 9)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // ANCHOR: create-client
7 use s2_sdk::{S2, types::S2Config};
8
9 let client = S2::new(S2Config::new(std::env::var("S2_ACCESS_TOKEN")?))?;
10
11 let basin = client.basin("my-basin".parse()?);
12 let stream = basin.stream("my-stream".parse()?);
13 // ANCHOR_END: create-client
14
15 println!("Created client for stream: {:?}", stream);
16 Ok(())
17}More examples
examples/list_all_basins.rs (line 13)
8async fn main() -> Result<(), Box<dyn std::error::Error>> {
9 let access_token =
10 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
11
12 let config = S2Config::new(access_token);
13 let s2 = S2::new(config)?;
14
15 let input = ListAllBasinsInput::new();
16
17 let basins: Vec<_> = s2.list_all_basins(input).take(10).try_collect().await?;
18
19 println!("{basins:#?}");
20
21 Ok(())
22}examples/delete_basin.rs (line 15)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let config = S2Config::new(access_token);
15 let s2 = S2::new(config)?;
16
17 let input = DeleteBasinInput::new(basin_name).with_ignore_not_found(true);
18 s2.delete_basin(input).await?;
19 println!("Deletion requested");
20
21 Ok(())
22}examples/list_streams.rs (line 14)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let basin = s2.basin(basin_name);
16
17 let input = ListStreamsInput::new().with_prefix("my-".parse()?);
18 let page = basin.list_streams(input).await?;
19 println!("{page:#?}");
20
21 Ok(())
22}examples/delete_stream.rs (line 17)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13 let stream_name: StreamName = std::env::var("S2_STREAM")
14 .map_err(|_| "S2_STREAM env var not set")?
15 .parse()?;
16
17 let s2 = S2::new(S2Config::new(access_token))?;
18 let basin = s2.basin(basin_name);
19
20 let input = DeleteStreamInput::new(stream_name);
21 basin.delete_stream(input).await?;
22 println!("Deletion requested");
23
24 Ok(())
25}examples/consumer.rs (line 14)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
11 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
12 let stream_name: StreamName = std::env::var("S2_STREAM")?.parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let stream = s2.basin(basin_name).stream(stream_name);
16
17 let input = ReadInput::new();
18 let mut batches = stream.read_session(input).await?;
19 loop {
20 select! {
21 batch = batches.next() => {
22 let Some(batch) = batch else { break };
23 let batch = batch?;
24 println!("{batch:?}");
25 }
26 _ = tokio::signal::ctrl_c() => break,
27 }
28 }
29
30 Ok(())
31}Additional examples can be found in:
- examples/reconfigure_stream.rs
- examples/create_basin.rs
- examples/get_latest_record.rs
- examples/reconfigure_basin.rs
- examples/explicit_trim.rs
- examples/issue_access_token.rs
- examples/create_stream.rs
- examples/producer.rs
- examples/docs_metrics.rs
- examples/docs_configuration.rs
- examples/docs_account_and_basins.rs
- examples/docs_streams.rs
Sourcepub fn basin(&self, name: BasinName) -> S2Basin
pub fn basin(&self, name: BasinName) -> S2Basin
Get an S2Basin.
Examples found in repository?
examples/docs_overview.rs (line 11)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // ANCHOR: create-client
7 use s2_sdk::{S2, types::S2Config};
8
9 let client = S2::new(S2Config::new(std::env::var("S2_ACCESS_TOKEN")?))?;
10
11 let basin = client.basin("my-basin".parse()?);
12 let stream = basin.stream("my-stream".parse()?);
13 // ANCHOR_END: create-client
14
15 println!("Created client for stream: {:?}", stream);
16 Ok(())
17}More examples
examples/list_streams.rs (line 15)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let basin = s2.basin(basin_name);
16
17 let input = ListStreamsInput::new().with_prefix("my-".parse()?);
18 let page = basin.list_streams(input).await?;
19 println!("{page:#?}");
20
21 Ok(())
22}examples/delete_stream.rs (line 18)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13 let stream_name: StreamName = std::env::var("S2_STREAM")
14 .map_err(|_| "S2_STREAM env var not set")?
15 .parse()?;
16
17 let s2 = S2::new(S2Config::new(access_token))?;
18 let basin = s2.basin(basin_name);
19
20 let input = DeleteStreamInput::new(stream_name);
21 basin.delete_stream(input).await?;
22 println!("Deletion requested");
23
24 Ok(())
25}examples/consumer.rs (line 15)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
11 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
12 let stream_name: StreamName = std::env::var("S2_STREAM")?.parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let stream = s2.basin(basin_name).stream(stream_name);
16
17 let input = ReadInput::new();
18 let mut batches = stream.read_session(input).await?;
19 loop {
20 select! {
21 batch = batches.next() => {
22 let Some(batch) = batch else { break };
23 let batch = batch?;
24 println!("{batch:?}");
25 }
26 _ = tokio::signal::ctrl_c() => break,
27 }
28 }
29
30 Ok(())
31}examples/reconfigure_stream.rs (line 21)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 let access_token =
12 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
13 let basin_name: BasinName = std::env::var("S2_BASIN")
14 .map_err(|_| "S2_BASIN env var not set")?
15 .parse()?;
16 let stream_name: StreamName = std::env::var("S2_STREAM")
17 .map_err(|_| "S2_STREAM env var not set")?
18 .parse()?;
19
20 let s2 = S2::new(S2Config::new(access_token))?;
21 let basin = s2.basin(basin_name);
22
23 let input = ReconfigureStreamInput::new(
24 stream_name,
25 StreamReconfiguration::new().with_retention_policy(RetentionPolicy::Age(10 * 24 * 60 * 60)),
26 );
27 let config = basin.reconfigure_stream(input).await?;
28 println!("{config:#?}");
29
30 Ok(())
31}examples/get_latest_record.rs (line 20)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token =
11 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
12 let basin_name: BasinName = std::env::var("S2_BASIN")
13 .map_err(|_| "S2_BASIN env var not set")?
14 .parse()?;
15 let stream_name: StreamName = std::env::var("S2_STREAM")
16 .map_err(|_| "S2_STREAM env var not set")?
17 .parse()?;
18
19 let s2 = S2::new(S2Config::new(access_token))?;
20 let stream = s2.basin(basin_name).stream(stream_name);
21
22 let input = ReadInput::new()
23 .with_start(ReadStart::new().with_from(ReadFrom::TailOffset(1)))
24 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(1)));
25 let batch = stream.read(input).await?;
26 println!("{batch:#?}");
27
28 Ok(())
29}Additional examples can be found in:
Sourcepub async fn list_basins(
&self,
input: ListBasinsInput,
) -> Result<Page<BasinInfo>, S2Error>
pub async fn list_basins( &self, input: ListBasinsInput, ) -> Result<Page<BasinInfo>, S2Error>
List a page of basins.
See list_all_basins for automatic pagination.
Examples found in repository?
examples/docs_account_and_basins.rs (line 25)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo>
pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo>
List all basins, paginating automatically.
Examples found in repository?
examples/list_all_basins.rs (line 17)
8async fn main() -> Result<(), Box<dyn std::error::Error>> {
9 let access_token =
10 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
11
12 let config = S2Config::new(access_token);
13 let s2 = S2::new(config)?;
14
15 let input = ListAllBasinsInput::new();
16
17 let basins: Vec<_> = s2.list_all_basins(input).take(10).try_collect().await?;
18
19 println!("{basins:#?}");
20
21 Ok(())
22}Sourcepub async fn create_basin(
&self,
input: CreateBasinInput,
) -> Result<BasinInfo, S2Error>
pub async fn create_basin( &self, input: CreateBasinInput, ) -> Result<BasinInfo, S2Error>
Create a basin.
Examples found in repository?
examples/create_basin.rs (line 22)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let config = S2Config::new(access_token);
15 let s2 = S2::new(config)?;
16
17 let input = CreateBasinInput::new(basin_name.clone()).with_config(
18 BasinConfig::new().with_default_stream_config(
19 StreamConfig::new().with_retention_policy(RetentionPolicy::Age(10 * 24 * 60 * 60)),
20 ),
21 );
22 let basin_info = s2.create_basin(input).await?;
23 println!("{basin_info:#?}");
24
25 let basin_config = s2.get_basin_config(basin_name).await?;
26 println!("{basin_config:#?}");
27
28 Ok(())
29}More examples
examples/docs_account_and_basins.rs (line 29)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub async fn ensure_basin(
&self,
input: EnsureBasinInput,
) -> Result<EnsureOutput<BasinInfo>, S2Error>
pub async fn ensure_basin( &self, input: EnsureBasinInput, ) -> Result<EnsureOutput<BasinInfo>, S2Error>
Ensure a basin.
If the basin doesn’t exist, creates the basin with specified configuration.
If the basin already exists:
- Its configuration is updated to the specified configuration, if different.
- Its configuration is unchanged, if the specified configuration is same.
Sourcepub async fn get_basin_config(
&self,
name: BasinName,
) -> Result<BasinConfig, S2Error>
pub async fn get_basin_config( &self, name: BasinName, ) -> Result<BasinConfig, S2Error>
Get basin configuration.
Examples found in repository?
examples/create_basin.rs (line 25)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let config = S2Config::new(access_token);
15 let s2 = S2::new(config)?;
16
17 let input = CreateBasinInput::new(basin_name.clone()).with_config(
18 BasinConfig::new().with_default_stream_config(
19 StreamConfig::new().with_retention_policy(RetentionPolicy::Age(10 * 24 * 60 * 60)),
20 ),
21 );
22 let basin_info = s2.create_basin(input).await?;
23 println!("{basin_info:#?}");
24
25 let basin_config = s2.get_basin_config(basin_name).await?;
26 println!("{basin_config:#?}");
27
28 Ok(())
29}More examples
examples/docs_account_and_basins.rs (line 33)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error>
pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error>
Delete a basin.
Examples found in repository?
examples/delete_basin.rs (line 18)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let config = S2Config::new(access_token);
15 let s2 = S2::new(config)?;
16
17 let input = DeleteBasinInput::new(basin_name).with_ignore_not_found(true);
18 s2.delete_basin(input).await?;
19 println!("Deletion requested");
20
21 Ok(())
22}More examples
examples/docs_account_and_basins.rs (line 37)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub async fn reconfigure_basin(
&self,
input: ReconfigureBasinInput,
) -> Result<BasinConfig, S2Error>
pub async fn reconfigure_basin( &self, input: ReconfigureBasinInput, ) -> Result<BasinConfig, S2Error>
Reconfigure a basin.
Examples found in repository?
examples/reconfigure_basin.rs (line 30)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 let access_token =
12 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
13 let basin_name: BasinName = std::env::var("S2_BASIN")
14 .map_err(|_| "S2_BASIN env var not set")?
15 .parse()?;
16
17 let config = S2Config::new(access_token);
18 let s2 = S2::new(config)?;
19
20 let input = ReconfigureBasinInput::new(
21 basin_name,
22 BasinReconfiguration::new()
23 .with_default_stream_config(
24 StreamReconfiguration::new()
25 .with_storage_class(StorageClass::Standard)
26 .with_timestamping(TimestampingReconfiguration::new().with_uncapped(true)),
27 )
28 .with_create_stream_on_read(true),
29 );
30 let config = s2.reconfigure_basin(input).await?;
31 println!("{config:#?}");
32
33 Ok(())
34}Sourcepub async fn list_access_tokens(
&self,
input: ListAccessTokensInput,
) -> Result<Page<AccessTokenInfo>, S2Error>
pub async fn list_access_tokens( &self, input: ListAccessTokensInput, ) -> Result<Page<AccessTokenInfo>, S2Error>
List a page of access tokens.
See list_all_access_tokens for automatic pagination.
Examples found in repository?
examples/docs_account_and_basins.rs (line 68)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub fn list_all_access_tokens(
&self,
input: ListAllAccessTokensInput,
) -> Streaming<AccessTokenInfo>
pub fn list_all_access_tokens( &self, input: ListAllAccessTokensInput, ) -> Streaming<AccessTokenInfo>
List all access tokens, paginating automatically.
Sourcepub async fn issue_access_token(
&self,
input: IssueAccessTokenInput,
) -> Result<String, S2Error>
pub async fn issue_access_token( &self, input: IssueAccessTokenInput, ) -> Result<String, S2Error>
Issue an access token.
Examples found in repository?
examples/issue_access_token.rs (line 29)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 let access_token =
12 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
13 let basin_name: BasinName = std::env::var("S2_BASIN")
14 .map_err(|_| "S2_BASIN env var not set")?
15 .parse()?;
16
17 let config = S2Config::new(access_token);
18 let s2 = S2::new(config)?;
19
20 let input = IssueAccessTokenInput::new(
21 "ro-token".parse()?,
22 AccessTokenScopeInput::from_op_group_perms(
23 OperationGroupPermissions::new().with_account(ReadWritePermissions::read_only()),
24 )
25 .with_ops([Operation::CreateStream])
26 .with_streams(StreamMatcher::Prefix("audit".parse()?))
27 .with_basins(BasinMatcher::Exact(basin_name)),
28 );
29 let issued_token = s2.issue_access_token(input).await?;
30 println!("Issued access token: {issued_token}");
31
32 Ok(())
33}More examples
examples/docs_account_and_basins.rs (lines 72-83)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub async fn revoke_access_token(
&self,
id: AccessTokenId,
) -> Result<(), S2Error>
pub async fn revoke_access_token( &self, id: AccessTokenId, ) -> Result<(), S2Error>
Revoke an access token.
Examples found in repository?
examples/docs_account_and_basins.rs (line 88)
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
18 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
19 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
20
21 let client = S2::new(S2Config::new(access_token))?;
22
23 // ANCHOR: basin-operations
24 // List basins
25 let basins = client.list_basins(ListBasinsInput::new()).await?;
26
27 // Create a basin
28 client
29 .create_basin(CreateBasinInput::new("my-events".parse()?))
30 .await?;
31
32 // Get configuration
33 let config = client.get_basin_config("my-events".parse()?).await?;
34
35 // Delete
36 client
37 .delete_basin(DeleteBasinInput::new("my-events".parse()?))
38 .await?;
39 // ANCHOR_END: basin-operations
40 println!("Basins: {:?}, config: {:?}", basins, config);
41
42 let basin = client.basin(basin_name);
43
44 // ANCHOR: stream-operations
45 // List streams
46 let streams = basin
47 .list_streams(ListStreamsInput::new().with_prefix("user-".parse()?))
48 .await?;
49
50 // Create a stream
51 // Optionally, pass `.with_config(StreamConfig { .. })` to CreateStreamInput.
52 basin
53 .create_stream(CreateStreamInput::new("user-actions".parse()?))
54 .await?;
55
56 // Get configuration
57 let config = basin.get_stream_config("user-actions".parse()?).await?;
58
59 // Delete
60 basin
61 .delete_stream(DeleteStreamInput::new("user-actions".parse()?))
62 .await?;
63 // ANCHOR_END: stream-operations
64 println!("Streams: {:?}, config: {:?}", streams, config);
65
66 // ANCHOR: access-token-basic
67 // List tokens (returns metadata, not the secret)
68 let tokens = client.list_access_tokens(Default::default()).await?;
69
70 // Issue a token scoped to streams under "users/1234/"
71 let result = client
72 .issue_access_token(
73 IssueAccessTokenInput::new(
74 "user-1234-rw-token".parse()?,
75 AccessTokenScopeInput::from_op_group_perms(
76 OperationGroupPermissions::new()
77 .with_stream(ReadWritePermissions::read_write()),
78 )
79 .with_basins(BasinMatcher::Prefix("".parse()?)) // all basins
80 .with_streams(StreamMatcher::Prefix("users/1234/".parse()?)),
81 )
82 .with_expires_at("2027-01-01T00:00:00Z".parse()?),
83 )
84 .await?;
85
86 // Revoke a token
87 client
88 .revoke_access_token("user-1234-rw-token".parse()?)
89 .await?;
90 // ANCHOR_END: access-token-basic
91 println!("Tokens: {:?}, issued: {:?}", tokens, result);
92
93 // ANCHOR: access-token-restricted
94 client
95 .issue_access_token(IssueAccessTokenInput::new(
96 "restricted-token".parse()?,
97 AccessTokenScopeInput::from_op_group_perms(
98 OperationGroupPermissions::new().with_stream(ReadWritePermissions::read_only()),
99 )
100 .with_basins(BasinMatcher::Exact("production".parse()?))
101 .with_streams(StreamMatcher::Prefix("logs/".parse()?)),
102 ))
103 .await?;
104 // ANCHOR_END: access-token-restricted
105
106 // Pagination examples - not executed by default
107 if false {
108 // ANCHOR: pagination
109 // Iterate through all streams with automatic pagination
110 let mut stream = basin.list_all_streams(ListAllStreamsInput::new());
111 while let Some(info) = stream.next().await {
112 let info = info?;
113 println!("{}", info.name);
114 }
115 // ANCHOR_END: pagination
116
117 // ANCHOR: pagination-filtering
118 // List streams with a prefix filter
119 let input = ListAllStreamsInput::new().with_prefix("events/".parse()?);
120 let mut stream = basin.list_all_streams(input);
121 while let Some(info) = stream.next().await {
122 println!("{}", info?.name);
123 }
124 // ANCHOR_END: pagination-filtering
125
126 // ANCHOR: pagination-deleted
127 // Include streams that are being deleted
128 let input = ListAllStreamsInput::new().with_include_deleted(true);
129 let mut stream = basin.list_all_streams(input);
130 while let Some(info) = stream.next().await {
131 let info = info?;
132 println!("{} {:?}", info.name, info.deleted_at);
133 }
134 // ANCHOR_END: pagination-deleted
135 }
136
137 Ok(())
138}Sourcepub async fn list_locations(&self) -> Result<Vec<LocationInfo>, S2Error>
pub async fn list_locations(&self) -> Result<Vec<LocationInfo>, S2Error>
List locations.
Sourcepub async fn get_default_location(&self) -> Result<LocationInfo, S2Error>
pub async fn get_default_location(&self) -> Result<LocationInfo, S2Error>
Get the default location.
Sourcepub async fn set_default_location(
&self,
location: LocationName,
) -> Result<LocationInfo, S2Error>
pub async fn set_default_location( &self, location: LocationName, ) -> Result<LocationInfo, S2Error>
Set the default location.
Sourcepub async fn get_account_metrics(
&self,
input: GetAccountMetricsInput,
) -> Result<Vec<Metric>, S2Error>
pub async fn get_account_metrics( &self, input: GetAccountMetricsInput, ) -> Result<Vec<Metric>, S2Error>
Get account metrics.
Examples found in repository?
examples/docs_metrics.rs (lines 28-30)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
16 let client = S2::new(S2Config::new(access_token))?;
17
18 // ANCHOR: metrics
19 let now = std::time::SystemTime::now()
20 .duration_since(std::time::UNIX_EPOCH)?
21 .as_secs() as u32;
22 let thirty_days_ago = now - 30 * 24 * 3600;
23 let six_hours_ago = now - 6 * 3600;
24 let hour_ago = now - 3600;
25
26 // Account-level: active basins over the last 30 days
27 let account_metrics = client
28 .get_account_metrics(GetAccountMetricsInput::new(AccountMetricSet::ActiveBasins(
29 TimeRange::new(thirty_days_ago, now),
30 )))
31 .await?;
32
33 // Basin-level: storage usage with hourly resolution
34 let basin_metrics = client
35 .get_basin_metrics(GetBasinMetricsInput::new(
36 "events".parse()?,
37 BasinMetricSet::Storage(TimeRange::new(six_hours_ago, now)),
38 ))
39 .await?;
40
41 // Stream-level: storage for a specific stream
42 let stream_metrics = client
43 .get_stream_metrics(GetStreamMetricsInput::new(
44 "events".parse()?,
45 "user-actions".parse()?,
46 StreamMetricSet::Storage(TimeRange::new(hour_ago, now)),
47 ))
48 .await?;
49 // ANCHOR_END: metrics
50
51 println!(
52 "{:?} {:?} {:?}",
53 account_metrics, basin_metrics, stream_metrics
54 );
55
56 Ok(())
57}Sourcepub async fn get_basin_metrics(
&self,
input: GetBasinMetricsInput,
) -> Result<Vec<Metric>, S2Error>
pub async fn get_basin_metrics( &self, input: GetBasinMetricsInput, ) -> Result<Vec<Metric>, S2Error>
Get basin metrics.
Examples found in repository?
examples/docs_metrics.rs (lines 35-38)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
16 let client = S2::new(S2Config::new(access_token))?;
17
18 // ANCHOR: metrics
19 let now = std::time::SystemTime::now()
20 .duration_since(std::time::UNIX_EPOCH)?
21 .as_secs() as u32;
22 let thirty_days_ago = now - 30 * 24 * 3600;
23 let six_hours_ago = now - 6 * 3600;
24 let hour_ago = now - 3600;
25
26 // Account-level: active basins over the last 30 days
27 let account_metrics = client
28 .get_account_metrics(GetAccountMetricsInput::new(AccountMetricSet::ActiveBasins(
29 TimeRange::new(thirty_days_ago, now),
30 )))
31 .await?;
32
33 // Basin-level: storage usage with hourly resolution
34 let basin_metrics = client
35 .get_basin_metrics(GetBasinMetricsInput::new(
36 "events".parse()?,
37 BasinMetricSet::Storage(TimeRange::new(six_hours_ago, now)),
38 ))
39 .await?;
40
41 // Stream-level: storage for a specific stream
42 let stream_metrics = client
43 .get_stream_metrics(GetStreamMetricsInput::new(
44 "events".parse()?,
45 "user-actions".parse()?,
46 StreamMetricSet::Storage(TimeRange::new(hour_ago, now)),
47 ))
48 .await?;
49 // ANCHOR_END: metrics
50
51 println!(
52 "{:?} {:?} {:?}",
53 account_metrics, basin_metrics, stream_metrics
54 );
55
56 Ok(())
57}Sourcepub async fn get_stream_metrics(
&self,
input: GetStreamMetricsInput,
) -> Result<Vec<Metric>, S2Error>
pub async fn get_stream_metrics( &self, input: GetStreamMetricsInput, ) -> Result<Vec<Metric>, S2Error>
Get stream metrics.
Examples found in repository?
examples/docs_metrics.rs (lines 43-47)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
16 let client = S2::new(S2Config::new(access_token))?;
17
18 // ANCHOR: metrics
19 let now = std::time::SystemTime::now()
20 .duration_since(std::time::UNIX_EPOCH)?
21 .as_secs() as u32;
22 let thirty_days_ago = now - 30 * 24 * 3600;
23 let six_hours_ago = now - 6 * 3600;
24 let hour_ago = now - 3600;
25
26 // Account-level: active basins over the last 30 days
27 let account_metrics = client
28 .get_account_metrics(GetAccountMetricsInput::new(AccountMetricSet::ActiveBasins(
29 TimeRange::new(thirty_days_ago, now),
30 )))
31 .await?;
32
33 // Basin-level: storage usage with hourly resolution
34 let basin_metrics = client
35 .get_basin_metrics(GetBasinMetricsInput::new(
36 "events".parse()?,
37 BasinMetricSet::Storage(TimeRange::new(six_hours_ago, now)),
38 ))
39 .await?;
40
41 // Stream-level: storage for a specific stream
42 let stream_metrics = client
43 .get_stream_metrics(GetStreamMetricsInput::new(
44 "events".parse()?,
45 "user-actions".parse()?,
46 StreamMetricSet::Storage(TimeRange::new(hour_ago, now)),
47 ))
48 .await?;
49 // ANCHOR_END: metrics
50
51 println!(
52 "{:?} {:?} {:?}",
53 account_metrics, basin_metrics, stream_metrics
54 );
55
56 Ok(())
57}Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for S2
impl !UnwindSafe for S2
impl Freeze for S2
impl Send for S2
impl Sync for S2
impl Unpin for S2
impl UnsafeUnpin for S2
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more