#[non_exhaustive]pub struct ReadLimits {
pub count: Option<usize>,
pub bytes: Option<usize>,
}Expand description
Limits on how much to read.
Fields (Non-exhaustive)§
This struct is marked as non-exhaustive
Non-exhaustive structs could have additional fields added in future. Therefore, non-exhaustive structs cannot be constructed in external crates using the traditional
Struct { .. } syntax; cannot be matched against without a wildcard ..; and struct update syntax will not work.count: Option<usize>Limit on number of records.
Defaults to 1000 for non-streaming read.
bytes: Option<usize>Limit on total metered bytes of records.
Defaults to 1MiB for non-streaming read.
Implementations§
Source§impl ReadLimits
impl ReadLimits
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new ReadLimits with default values.
Examples found in repository?
examples/get_latest_record.rs (line 24)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token =
11 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
12 let basin_name: BasinName = std::env::var("S2_BASIN")
13 .map_err(|_| "S2_BASIN env var not set")?
14 .parse()?;
15 let stream_name: StreamName = std::env::var("S2_STREAM")
16 .map_err(|_| "S2_STREAM env var not set")?
17 .parse()?;
18
19 let s2 = S2::new(S2Config::new(access_token))?;
20 let stream = s2.basin(basin_name).stream(stream_name);
21
22 let input = ReadInput::new()
23 .with_start(ReadStart::new().with_from(ReadFrom::TailOffset(1)))
24 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(1)));
25 let batch = stream.read(input).await?;
26 println!("{batch:#?}");
27
28 Ok(())
29}More examples
examples/docs_streams.rs (line 62)
20async fn main() -> Result<(), Box<dyn std::error::Error>> {
21 let token = std::env::var("S2_ACCESS_TOKEN")?;
22 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
23
24 let client = S2::new(S2Config::new(token))?;
25 let basin = client.basin(basin_name);
26
27 // Create a temporary stream for examples
28 let stream_name: StreamName = format!(
29 "docs-streams-{}",
30 std::time::SystemTime::now()
31 .duration_since(std::time::UNIX_EPOCH)?
32 .as_millis()
33 )
34 .parse()?;
35 basin
36 .create_stream(s2_sdk::types::CreateStreamInput::new(stream_name.clone()))
37 .await?;
38
39 // ANCHOR: simple-append
40 let stream = basin.stream(stream_name.clone());
41
42 let records = AppendRecordBatch::try_from_iter([
43 AppendRecord::new("first event")?,
44 AppendRecord::new("second event")?,
45 ])?;
46
47 let ack = stream.append(AppendInput::new(records)).await?;
48
49 // ack tells us where the records landed
50 println!(
51 "Wrote records {} through {}",
52 ack.start.seq_num,
53 ack.end.seq_num - 1
54 );
55 // ANCHOR_END: simple-append
56
57 // ANCHOR: simple-read
58 let batch = stream
59 .read(
60 ReadInput::new()
61 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
62 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
63 )
64 .await?;
65
66 for record in batch.records {
67 println!("[{}] {:?}", record.seq_num, record.body);
68 }
69 // ANCHOR_END: simple-read
70
71 // ANCHOR: append-session
72 let session = stream.append_session(AppendSessionConfig::new());
73
74 // Submit a batch - this enqueues it and returns a ticket
75 let records = AppendRecordBatch::try_from_iter([
76 AppendRecord::new("event-1")?,
77 AppendRecord::new("event-2")?,
78 ])?;
79 let ticket = session.submit(AppendInput::new(records)).await?;
80
81 // Wait for durability
82 let ack = ticket.await?;
83 println!("Durable at seqNum {}", ack.start.seq_num);
84
85 session.close().await?;
86 // ANCHOR_END: append-session
87
88 // ANCHOR: producer
89 let producer = stream.producer(
90 ProducerConfig::new()
91 .with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
92 );
93
94 // Submit individual records
95 let ticket = producer.submit(AppendRecord::new("my event")?).await?;
96
97 // Get the exact sequence number
98 let ack = ticket.await?;
99 println!("Record durable at seqNum {}", ack.seq_num);
100
101 producer.close().await?;
102 // ANCHOR_END: producer
103
104 // ANCHOR: check-tail
105 let tail = stream.check_tail().await?;
106 println!("Stream has {} records", tail.seq_num);
107 // ANCHOR_END: check-tail
108
109 // Cleanup
110 basin
111 .delete_stream(s2_sdk::types::DeleteStreamInput::new(stream_name))
112 .await?;
113
114 println!("Streams examples completed");
115
116 // The following read session examples are for documentation snippets only.
117 // They are not executed because they would block waiting for new records.
118 if std::env::var("RUN_READ_SESSIONS").is_err() {
119 return Ok(());
120 }
121
122 // ANCHOR: read-session
123 let mut session = stream
124 .read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
125 .await?;
126
127 while let Some(batch) = session.next().await {
128 let batch = batch?;
129 for record in batch.records {
130 println!("[{}] {:?}", record.seq_num, record.body);
131 }
132 }
133 // ANCHOR_END: read-session
134
135 // ANCHOR: read-session-tail-offset
136 // Start reading from 10 records before the current tail
137 let mut session = stream
138 .read_session(
139 ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
140 )
141 .await?;
142
143 while let Some(batch) = session.next().await {
144 let batch = batch?;
145 for record in batch.records {
146 println!("[{}] {:?}", record.seq_num, record.body);
147 }
148 }
149 // ANCHOR_END: read-session-tail-offset
150
151 // ANCHOR: read-session-timestamp
152 // Start reading from a specific timestamp
153 let one_hour_ago = std::time::SystemTime::now()
154 .duration_since(std::time::UNIX_EPOCH)?
155 .as_millis() as u64
156 - 3600 * 1000;
157 let mut session = stream
158 .read_session(
159 ReadInput::new()
160 .with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
161 )
162 .await?;
163
164 while let Some(batch) = session.next().await {
165 let batch = batch?;
166 for record in batch.records {
167 println!("[{}] {:?}", record.seq_num, record.body);
168 }
169 }
170 // ANCHOR_END: read-session-timestamp
171
172 // ANCHOR: read-session-until
173 // Read records until a specific timestamp
174 let one_hour_ago = std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)?
176 .as_millis() as u64
177 - 3600 * 1000;
178 let mut session = stream
179 .read_session(
180 ReadInput::new()
181 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
182 .with_stop(ReadStop::new().with_until(..one_hour_ago)),
183 )
184 .await?;
185
186 while let Some(batch) = session.next().await {
187 let batch = batch?;
188 for record in batch.records {
189 println!("[{}] {:?}", record.seq_num, record.body);
190 }
191 }
192 // ANCHOR_END: read-session-until
193
194 // ANCHOR: read-session-wait
195 // Read all available records, and once reaching the current tail, wait an additional 30 seconds
196 // for new ones
197 let mut session = stream
198 .read_session(
199 ReadInput::new()
200 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
201 .with_stop(ReadStop::new().with_wait(30)),
202 )
203 .await?;
204
205 while let Some(batch) = session.next().await {
206 let batch = batch?;
207 for record in batch.records {
208 println!("[{}] {:?}", record.seq_num, record.body);
209 }
210 }
211 // ANCHOR_END: read-session-wait
212
213 Ok(())
214}Sourcepub fn with_count(self, count: usize) -> Self
pub fn with_count(self, count: usize) -> Self
Set the limit on number of records.
Examples found in repository?
examples/get_latest_record.rs (line 24)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token =
11 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
12 let basin_name: BasinName = std::env::var("S2_BASIN")
13 .map_err(|_| "S2_BASIN env var not set")?
14 .parse()?;
15 let stream_name: StreamName = std::env::var("S2_STREAM")
16 .map_err(|_| "S2_STREAM env var not set")?
17 .parse()?;
18
19 let s2 = S2::new(S2Config::new(access_token))?;
20 let stream = s2.basin(basin_name).stream(stream_name);
21
22 let input = ReadInput::new()
23 .with_start(ReadStart::new().with_from(ReadFrom::TailOffset(1)))
24 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(1)));
25 let batch = stream.read(input).await?;
26 println!("{batch:#?}");
27
28 Ok(())
29}More examples
examples/docs_streams.rs (line 62)
20async fn main() -> Result<(), Box<dyn std::error::Error>> {
21 let token = std::env::var("S2_ACCESS_TOKEN")?;
22 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
23
24 let client = S2::new(S2Config::new(token))?;
25 let basin = client.basin(basin_name);
26
27 // Create a temporary stream for examples
28 let stream_name: StreamName = format!(
29 "docs-streams-{}",
30 std::time::SystemTime::now()
31 .duration_since(std::time::UNIX_EPOCH)?
32 .as_millis()
33 )
34 .parse()?;
35 basin
36 .create_stream(s2_sdk::types::CreateStreamInput::new(stream_name.clone()))
37 .await?;
38
39 // ANCHOR: simple-append
40 let stream = basin.stream(stream_name.clone());
41
42 let records = AppendRecordBatch::try_from_iter([
43 AppendRecord::new("first event")?,
44 AppendRecord::new("second event")?,
45 ])?;
46
47 let ack = stream.append(AppendInput::new(records)).await?;
48
49 // ack tells us where the records landed
50 println!(
51 "Wrote records {} through {}",
52 ack.start.seq_num,
53 ack.end.seq_num - 1
54 );
55 // ANCHOR_END: simple-append
56
57 // ANCHOR: simple-read
58 let batch = stream
59 .read(
60 ReadInput::new()
61 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
62 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
63 )
64 .await?;
65
66 for record in batch.records {
67 println!("[{}] {:?}", record.seq_num, record.body);
68 }
69 // ANCHOR_END: simple-read
70
71 // ANCHOR: append-session
72 let session = stream.append_session(AppendSessionConfig::new());
73
74 // Submit a batch - this enqueues it and returns a ticket
75 let records = AppendRecordBatch::try_from_iter([
76 AppendRecord::new("event-1")?,
77 AppendRecord::new("event-2")?,
78 ])?;
79 let ticket = session.submit(AppendInput::new(records)).await?;
80
81 // Wait for durability
82 let ack = ticket.await?;
83 println!("Durable at seqNum {}", ack.start.seq_num);
84
85 session.close().await?;
86 // ANCHOR_END: append-session
87
88 // ANCHOR: producer
89 let producer = stream.producer(
90 ProducerConfig::new()
91 .with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
92 );
93
94 // Submit individual records
95 let ticket = producer.submit(AppendRecord::new("my event")?).await?;
96
97 // Get the exact sequence number
98 let ack = ticket.await?;
99 println!("Record durable at seqNum {}", ack.seq_num);
100
101 producer.close().await?;
102 // ANCHOR_END: producer
103
104 // ANCHOR: check-tail
105 let tail = stream.check_tail().await?;
106 println!("Stream has {} records", tail.seq_num);
107 // ANCHOR_END: check-tail
108
109 // Cleanup
110 basin
111 .delete_stream(s2_sdk::types::DeleteStreamInput::new(stream_name))
112 .await?;
113
114 println!("Streams examples completed");
115
116 // The following read session examples are for documentation snippets only.
117 // They are not executed because they would block waiting for new records.
118 if std::env::var("RUN_READ_SESSIONS").is_err() {
119 return Ok(());
120 }
121
122 // ANCHOR: read-session
123 let mut session = stream
124 .read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
125 .await?;
126
127 while let Some(batch) = session.next().await {
128 let batch = batch?;
129 for record in batch.records {
130 println!("[{}] {:?}", record.seq_num, record.body);
131 }
132 }
133 // ANCHOR_END: read-session
134
135 // ANCHOR: read-session-tail-offset
136 // Start reading from 10 records before the current tail
137 let mut session = stream
138 .read_session(
139 ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
140 )
141 .await?;
142
143 while let Some(batch) = session.next().await {
144 let batch = batch?;
145 for record in batch.records {
146 println!("[{}] {:?}", record.seq_num, record.body);
147 }
148 }
149 // ANCHOR_END: read-session-tail-offset
150
151 // ANCHOR: read-session-timestamp
152 // Start reading from a specific timestamp
153 let one_hour_ago = std::time::SystemTime::now()
154 .duration_since(std::time::UNIX_EPOCH)?
155 .as_millis() as u64
156 - 3600 * 1000;
157 let mut session = stream
158 .read_session(
159 ReadInput::new()
160 .with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
161 )
162 .await?;
163
164 while let Some(batch) = session.next().await {
165 let batch = batch?;
166 for record in batch.records {
167 println!("[{}] {:?}", record.seq_num, record.body);
168 }
169 }
170 // ANCHOR_END: read-session-timestamp
171
172 // ANCHOR: read-session-until
173 // Read records until a specific timestamp
174 let one_hour_ago = std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)?
176 .as_millis() as u64
177 - 3600 * 1000;
178 let mut session = stream
179 .read_session(
180 ReadInput::new()
181 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
182 .with_stop(ReadStop::new().with_until(..one_hour_ago)),
183 )
184 .await?;
185
186 while let Some(batch) = session.next().await {
187 let batch = batch?;
188 for record in batch.records {
189 println!("[{}] {:?}", record.seq_num, record.body);
190 }
191 }
192 // ANCHOR_END: read-session-until
193
194 // ANCHOR: read-session-wait
195 // Read all available records, and once reaching the current tail, wait an additional 30 seconds
196 // for new ones
197 let mut session = stream
198 .read_session(
199 ReadInput::new()
200 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
201 .with_stop(ReadStop::new().with_wait(30)),
202 )
203 .await?;
204
205 while let Some(batch) = session.next().await {
206 let batch = batch?;
207 for record in batch.records {
208 println!("[{}] {:?}", record.seq_num, record.body);
209 }
210 }
211 // ANCHOR_END: read-session-wait
212
213 Ok(())
214}Sourcepub fn with_bytes(self, bytes: usize) -> Self
pub fn with_bytes(self, bytes: usize) -> Self
Set the limit on total metered bytes of records.
Trait Implementations§
Source§impl Clone for ReadLimits
impl Clone for ReadLimits
Source§fn clone(&self) -> ReadLimits
fn clone(&self) -> ReadLimits
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl Debug for ReadLimits
impl Debug for ReadLimits
Source§impl Default for ReadLimits
impl Default for ReadLimits
Source§fn default() -> ReadLimits
fn default() -> ReadLimits
Returns the “default value” for a type. Read more
Auto Trait Implementations§
impl Freeze for ReadLimits
impl RefUnwindSafe for ReadLimits
impl Send for ReadLimits
impl Sync for ReadLimits
impl Unpin for ReadLimits
impl UnwindSafe for ReadLimits
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more