pub struct StreamConfig {
pub storage_class: StorageClass,
pub retention_policy: Option<RetentionPolicy>,
pub require_client_timestamps: Option<bool>,
pub uncapped_client_timestamps: Option<bool>,
}Expand description
Stream configuration.
Fields§
§storage_class: StorageClassStorage class for recent writes. This is the main cost:performance knob in S2.
retention_policy: Option<RetentionPolicy>Retention policy for the stream. If unspecified, the default is to retain records for 7 days.
require_client_timestamps: Option<bool>Controls how to handle timestamps when they are not provided by the client. If this is false (or not set), the record’s arrival time in milliseconds since Unix epoch will be assigned as its timestamp. If this is true, then any append without a client-specified timestamp will be rejected as invalid.
uncapped_client_timestamps: Option<bool>Allow client timestamps to exceed the arrival time in milliseconds since Unix epoch. If this is false (or not set), client timestamps will be capped at the arrival time.
Implementations§
Source§impl StreamConfig
impl StreamConfig
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new stream config.
Examples found in repository?
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let client = Client::new(config);
11
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let basin_client = client.basin_client(basin);
14
15 let stream = "my-favorite-stream";
16
17 let stream_config = StreamConfig::new().with_storage_class(StorageClass::Express);
18
19 let create_stream_request = CreateStreamRequest::new(stream).with_config(stream_config);
20
21 let created_stream = basin_client.create_stream(create_stream_request).await?;
22 println!("{created_stream:#?}");
23
24 let stream_config = basin_client.get_stream_config(stream).await?;
25 println!("{stream_config:#?}");
26
27 Ok(())
28}More examples
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let basin_client = BasinClient::new(config, basin);
14
15 let stream = "my-favorite-stream";
16
17 let stream_config_updates = StreamConfig::new().with_retention_policy(RetentionPolicy::Age(
18 // Change to retention policy to 1 day
19 Duration::from_secs(24 * 60 * 60),
20 ));
21
22 let reconfigure_stream_request = ReconfigureStreamRequest::new(stream)
23 .with_config(stream_config_updates)
24 // Field mask specifies which fields to update.
25 .with_mask(vec!["retention_policy".to_string()]);
26
27 let updated_stream_config = basin_client
28 .reconfigure_stream(reconfigure_stream_request)
29 .await?;
30
31 println!("{updated_stream_config:#?}");
32
33 Ok(())
34}9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let client = Client::new(config);
13
14 let basin: BasinName = "my-favorite-basin".parse()?;
15
16 let default_stream_config = StreamConfig::new().with_retention_policy(RetentionPolicy::Age(
17 // Set the default retention age to 10 days.
18 Duration::from_secs(10 * 24 * 60 * 60),
19 ));
20
21 let basin_config = BasinConfig::new()
22 .with_default_stream_config(default_stream_config)
23 .with_create_stream_on_append(false)
24 .with_create_stream_on_read(false);
25
26 let create_basin_request = CreateBasinRequest::new(basin.clone()).with_config(basin_config);
27
28 let created_basin = client.create_basin(create_basin_request).await?;
29 println!("{created_basin:#?}");
30
31 let basin_config = client.get_basin_config(basin).await?;
32 println!("{basin_config:#?}");
33
34 Ok(())
35}7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let client = Client::new(config);
11
12 let basin: BasinName = "my-favorite-basin".parse()?;
13
14 let default_stream_config_updates =
15 StreamConfig::new().with_storage_class(StorageClass::Standard);
16 let basin_config_updates = BasinConfig::new()
17 .with_default_stream_config(default_stream_config_updates)
18 .with_create_stream_on_append(true)
19 .with_create_stream_on_read(true);
20
21 let reconfigure_basin_request = ReconfigureBasinRequest::new(basin)
22 .with_config(basin_config_updates)
23 // Field mask specifies which fields to update.
24 .with_mask(vec!["default_stream_config.retention_policy".to_string()]);
25
26 let updated_basin_config = client.reconfigure_basin(reconfigure_basin_request).await?;
27
28 println!("{updated_basin_config:#?}");
29
30 Ok(())
31}Sourcepub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self
pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self
Overwrite storage class.
Examples found in repository?
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let client = Client::new(config);
11
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let basin_client = client.basin_client(basin);
14
15 let stream = "my-favorite-stream";
16
17 let stream_config = StreamConfig::new().with_storage_class(StorageClass::Express);
18
19 let create_stream_request = CreateStreamRequest::new(stream).with_config(stream_config);
20
21 let created_stream = basin_client.create_stream(create_stream_request).await?;
22 println!("{created_stream:#?}");
23
24 let stream_config = basin_client.get_stream_config(stream).await?;
25 println!("{stream_config:#?}");
26
27 Ok(())
28}More examples
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let client = Client::new(config);
11
12 let basin: BasinName = "my-favorite-basin".parse()?;
13
14 let default_stream_config_updates =
15 StreamConfig::new().with_storage_class(StorageClass::Standard);
16 let basin_config_updates = BasinConfig::new()
17 .with_default_stream_config(default_stream_config_updates)
18 .with_create_stream_on_append(true)
19 .with_create_stream_on_read(true);
20
21 let reconfigure_basin_request = ReconfigureBasinRequest::new(basin)
22 .with_config(basin_config_updates)
23 // Field mask specifies which fields to update.
24 .with_mask(vec!["default_stream_config.retention_policy".to_string()]);
25
26 let updated_basin_config = client.reconfigure_basin(reconfigure_basin_request).await?;
27
28 println!("{updated_basin_config:#?}");
29
30 Ok(())
31}Sourcepub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self
pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self
Overwrite retention policy.
Examples found in repository?
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let basin_client = BasinClient::new(config, basin);
14
15 let stream = "my-favorite-stream";
16
17 let stream_config_updates = StreamConfig::new().with_retention_policy(RetentionPolicy::Age(
18 // Change to retention policy to 1 day
19 Duration::from_secs(24 * 60 * 60),
20 ));
21
22 let reconfigure_stream_request = ReconfigureStreamRequest::new(stream)
23 .with_config(stream_config_updates)
24 // Field mask specifies which fields to update.
25 .with_mask(vec!["retention_policy".to_string()]);
26
27 let updated_stream_config = basin_client
28 .reconfigure_stream(reconfigure_stream_request)
29 .await?;
30
31 println!("{updated_stream_config:#?}");
32
33 Ok(())
34}More examples
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let client = Client::new(config);
13
14 let basin: BasinName = "my-favorite-basin".parse()?;
15
16 let default_stream_config = StreamConfig::new().with_retention_policy(RetentionPolicy::Age(
17 // Set the default retention age to 10 days.
18 Duration::from_secs(10 * 24 * 60 * 60),
19 ));
20
21 let basin_config = BasinConfig::new()
22 .with_default_stream_config(default_stream_config)
23 .with_create_stream_on_append(false)
24 .with_create_stream_on_read(false);
25
26 let create_basin_request = CreateBasinRequest::new(basin.clone()).with_config(basin_config);
27
28 let created_basin = client.create_basin(create_basin_request).await?;
29 println!("{created_basin:#?}");
30
31 let basin_config = client.get_basin_config(basin).await?;
32 println!("{basin_config:#?}");
33
34 Ok(())
35}Sourcepub fn with_require_client_timestamps(
self,
require_client_timestamps: bool,
) -> Self
pub fn with_require_client_timestamps( self, require_client_timestamps: bool, ) -> Self
Overwrite require_client_timestamps.
Sourcepub fn with_uncapped_client_timestamps(
self,
uncapped_client_timestamps: bool,
) -> Self
pub fn with_uncapped_client_timestamps( self, uncapped_client_timestamps: bool, ) -> Self
Overwrite uncapped_client_timestamps.
Trait Implementations§
Source§impl Clone for StreamConfig
impl Clone for StreamConfig
Source§fn clone(&self) -> StreamConfig
fn clone(&self) -> StreamConfig
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for StreamConfig
impl Debug for StreamConfig
Source§impl Default for StreamConfig
impl Default for StreamConfig
Source§fn default() -> StreamConfig
fn default() -> StreamConfig
Auto Trait Implementations§
impl Freeze for StreamConfig
impl RefUnwindSafe for StreamConfig
impl Send for StreamConfig
impl Sync for StreamConfig
impl Unpin for StreamConfig
impl UnwindSafe for StreamConfig
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request