AppendSession

Struct AppendSession 

Source
pub struct AppendSession { /* private fields */ }
Expand description

A session for high-throughput appending with backpressure control. It can be created from append_session.

Supports pipelining multiple AppendInputs while preserving submission order.

Implementations§

Source§

impl AppendSession

Source

pub async fn submit( &self, input: AppendInput, ) -> Result<BatchSubmitTicket, S2Error>

Submit a batch of records for appending.

Internally, it waits on reserve, then submits using the permit. This provides backpressure when inflight limits are reached. For explicit control, use reserve followed by BatchSubmitPermit::submit.

Note: After all submits, you must call close to ensure all batches are appended.

Examples found in repository?
examples/docs_streams.rs (line 79)
20async fn main() -> Result<(), Box<dyn std::error::Error>> {
21    let token = std::env::var("S2_ACCESS_TOKEN")?;
22    let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
23
24    let client = S2::new(S2Config::new(token))?;
25    let basin = client.basin(basin_name);
26
27    // Create a temporary stream for examples
28    let stream_name: StreamName = format!(
29        "docs-streams-{}",
30        std::time::SystemTime::now()
31            .duration_since(std::time::UNIX_EPOCH)?
32            .as_millis()
33    )
34    .parse()?;
35    basin
36        .create_stream(s2_sdk::types::CreateStreamInput::new(stream_name.clone()))
37        .await?;
38
39    // ANCHOR: simple-append
40    let stream = basin.stream(stream_name.clone());
41
42    let records = AppendRecordBatch::try_from_iter([
43        AppendRecord::new("first event")?,
44        AppendRecord::new("second event")?,
45    ])?;
46
47    let ack = stream.append(AppendInput::new(records)).await?;
48
49    // ack tells us where the records landed
50    println!(
51        "Wrote records {} through {}",
52        ack.start.seq_num,
53        ack.end.seq_num - 1
54    );
55    // ANCHOR_END: simple-append
56
57    // ANCHOR: simple-read
58    let batch = stream
59        .read(
60            ReadInput::new()
61                .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
62                .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
63        )
64        .await?;
65
66    for record in batch.records {
67        println!("[{}] {:?}", record.seq_num, record.body);
68    }
69    // ANCHOR_END: simple-read
70
71    // ANCHOR: append-session
72    let session = stream.append_session(AppendSessionConfig::new());
73
74    // Submit a batch - this enqueues it and returns a ticket
75    let records = AppendRecordBatch::try_from_iter([
76        AppendRecord::new("event-1")?,
77        AppendRecord::new("event-2")?,
78    ])?;
79    let ticket = session.submit(AppendInput::new(records)).await?;
80
81    // Wait for durability
82    let ack = ticket.await?;
83    println!("Durable at seqNum {}", ack.start.seq_num);
84
85    session.close().await?;
86    // ANCHOR_END: append-session
87
88    // ANCHOR: producer
89    let producer = stream.producer(
90        ProducerConfig::new()
91            .with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
92    );
93
94    // Submit individual records
95    let ticket = producer.submit(AppendRecord::new("my event")?).await?;
96
97    // Get the exact sequence number
98    let ack = ticket.await?;
99    println!("Record durable at seqNum {}", ack.seq_num);
100
101    producer.close().await?;
102    // ANCHOR_END: producer
103
104    // ANCHOR: check-tail
105    let tail = stream.check_tail().await?;
106    println!("Stream has {} records", tail.seq_num);
107    // ANCHOR_END: check-tail
108
109    // Cleanup
110    basin
111        .delete_stream(s2_sdk::types::DeleteStreamInput::new(stream_name))
112        .await?;
113
114    println!("Streams examples completed");
115
116    // The following read session examples are for documentation snippets only.
117    // They are not executed because they would block waiting for new records.
118    if std::env::var("RUN_READ_SESSIONS").is_err() {
119        return Ok(());
120    }
121
122    // ANCHOR: read-session
123    let mut session = stream
124        .read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
125        .await?;
126
127    while let Some(batch) = session.next().await {
128        let batch = batch?;
129        for record in batch.records {
130            println!("[{}] {:?}", record.seq_num, record.body);
131        }
132    }
133    // ANCHOR_END: read-session
134
135    // ANCHOR: read-session-tail-offset
136    // Start reading from 10 records before the current tail
137    let mut session = stream
138        .read_session(
139            ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
140        )
141        .await?;
142
143    while let Some(batch) = session.next().await {
144        let batch = batch?;
145        for record in batch.records {
146            println!("[{}] {:?}", record.seq_num, record.body);
147        }
148    }
149    // ANCHOR_END: read-session-tail-offset
150
151    // ANCHOR: read-session-timestamp
152    // Start reading from a specific timestamp
153    let one_hour_ago = std::time::SystemTime::now()
154        .duration_since(std::time::UNIX_EPOCH)?
155        .as_millis() as u64
156        - 3600 * 1000;
157    let mut session = stream
158        .read_session(
159            ReadInput::new()
160                .with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
161        )
162        .await?;
163
164    while let Some(batch) = session.next().await {
165        let batch = batch?;
166        for record in batch.records {
167            println!("[{}] {:?}", record.seq_num, record.body);
168        }
169    }
170    // ANCHOR_END: read-session-timestamp
171
172    // ANCHOR: read-session-until
173    // Read records until a specific timestamp
174    let one_hour_ago = std::time::SystemTime::now()
175        .duration_since(std::time::UNIX_EPOCH)?
176        .as_millis() as u64
177        - 3600 * 1000;
178    let mut session = stream
179        .read_session(
180            ReadInput::new()
181                .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
182                .with_stop(ReadStop::new().with_until(..one_hour_ago)),
183        )
184        .await?;
185
186    while let Some(batch) = session.next().await {
187        let batch = batch?;
188        for record in batch.records {
189            println!("[{}] {:?}", record.seq_num, record.body);
190        }
191    }
192    // ANCHOR_END: read-session-until
193
194    // ANCHOR: read-session-wait
195    // Read all available records, and once reaching the current tail, wait an additional 30 seconds
196    // for new ones
197    let mut session = stream
198        .read_session(
199            ReadInput::new()
200                .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
201                .with_stop(ReadStop::new().with_wait(30)),
202        )
203        .await?;
204
205    while let Some(batch) = session.next().await {
206        let batch = batch?;
207        for record in batch.records {
208            println!("[{}] {:?}", record.seq_num, record.body);
209        }
210    }
211    // ANCHOR_END: read-session-wait
212
213    Ok(())
214}
Source

pub async fn reserve(&self, bytes: u32) -> Result<BatchSubmitPermit, S2Error>

Reserve capacity for a batch to be submitted. Useful in select! loops where you want to interleave submission with other async work. See submit for a simpler API.

Waits when inflight limits are reached, providing explicit backpressure control. The returned permit must be used to submit the batch.

Note: After all submits, you must call close to ensure all batches are appended.

§Cancel safety

This method is cancel safe. Internally, it only awaits Semaphore::acquire_many_owned and Sender::reserve_owned, both of which are cancel safe.

Source

pub async fn close(self) -> Result<(), S2Error>

Close the session and wait for all submitted batch of records to be appended.

Examples found in repository?
examples/docs_streams.rs (line 85)
20async fn main() -> Result<(), Box<dyn std::error::Error>> {
21    let token = std::env::var("S2_ACCESS_TOKEN")?;
22    let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
23
24    let client = S2::new(S2Config::new(token))?;
25    let basin = client.basin(basin_name);
26
27    // Create a temporary stream for examples
28    let stream_name: StreamName = format!(
29        "docs-streams-{}",
30        std::time::SystemTime::now()
31            .duration_since(std::time::UNIX_EPOCH)?
32            .as_millis()
33    )
34    .parse()?;
35    basin
36        .create_stream(s2_sdk::types::CreateStreamInput::new(stream_name.clone()))
37        .await?;
38
39    // ANCHOR: simple-append
40    let stream = basin.stream(stream_name.clone());
41
42    let records = AppendRecordBatch::try_from_iter([
43        AppendRecord::new("first event")?,
44        AppendRecord::new("second event")?,
45    ])?;
46
47    let ack = stream.append(AppendInput::new(records)).await?;
48
49    // ack tells us where the records landed
50    println!(
51        "Wrote records {} through {}",
52        ack.start.seq_num,
53        ack.end.seq_num - 1
54    );
55    // ANCHOR_END: simple-append
56
57    // ANCHOR: simple-read
58    let batch = stream
59        .read(
60            ReadInput::new()
61                .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
62                .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
63        )
64        .await?;
65
66    for record in batch.records {
67        println!("[{}] {:?}", record.seq_num, record.body);
68    }
69    // ANCHOR_END: simple-read
70
71    // ANCHOR: append-session
72    let session = stream.append_session(AppendSessionConfig::new());
73
74    // Submit a batch - this enqueues it and returns a ticket
75    let records = AppendRecordBatch::try_from_iter([
76        AppendRecord::new("event-1")?,
77        AppendRecord::new("event-2")?,
78    ])?;
79    let ticket = session.submit(AppendInput::new(records)).await?;
80
81    // Wait for durability
82    let ack = ticket.await?;
83    println!("Durable at seqNum {}", ack.start.seq_num);
84
85    session.close().await?;
86    // ANCHOR_END: append-session
87
88    // ANCHOR: producer
89    let producer = stream.producer(
90        ProducerConfig::new()
91            .with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
92    );
93
94    // Submit individual records
95    let ticket = producer.submit(AppendRecord::new("my event")?).await?;
96
97    // Get the exact sequence number
98    let ack = ticket.await?;
99    println!("Record durable at seqNum {}", ack.seq_num);
100
101    producer.close().await?;
102    // ANCHOR_END: producer
103
104    // ANCHOR: check-tail
105    let tail = stream.check_tail().await?;
106    println!("Stream has {} records", tail.seq_num);
107    // ANCHOR_END: check-tail
108
109    // Cleanup
110    basin
111        .delete_stream(s2_sdk::types::DeleteStreamInput::new(stream_name))
112        .await?;
113
114    println!("Streams examples completed");
115
116    // The following read session examples are for documentation snippets only.
117    // They are not executed because they would block waiting for new records.
118    if std::env::var("RUN_READ_SESSIONS").is_err() {
119        return Ok(());
120    }
121
122    // ANCHOR: read-session
123    let mut session = stream
124        .read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
125        .await?;
126
127    while let Some(batch) = session.next().await {
128        let batch = batch?;
129        for record in batch.records {
130            println!("[{}] {:?}", record.seq_num, record.body);
131        }
132    }
133    // ANCHOR_END: read-session
134
135    // ANCHOR: read-session-tail-offset
136    // Start reading from 10 records before the current tail
137    let mut session = stream
138        .read_session(
139            ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
140        )
141        .await?;
142
143    while let Some(batch) = session.next().await {
144        let batch = batch?;
145        for record in batch.records {
146            println!("[{}] {:?}", record.seq_num, record.body);
147        }
148    }
149    // ANCHOR_END: read-session-tail-offset
150
151    // ANCHOR: read-session-timestamp
152    // Start reading from a specific timestamp
153    let one_hour_ago = std::time::SystemTime::now()
154        .duration_since(std::time::UNIX_EPOCH)?
155        .as_millis() as u64
156        - 3600 * 1000;
157    let mut session = stream
158        .read_session(
159            ReadInput::new()
160                .with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
161        )
162        .await?;
163
164    while let Some(batch) = session.next().await {
165        let batch = batch?;
166        for record in batch.records {
167            println!("[{}] {:?}", record.seq_num, record.body);
168        }
169    }
170    // ANCHOR_END: read-session-timestamp
171
172    // ANCHOR: read-session-until
173    // Read records until a specific timestamp
174    let one_hour_ago = std::time::SystemTime::now()
175        .duration_since(std::time::UNIX_EPOCH)?
176        .as_millis() as u64
177        - 3600 * 1000;
178    let mut session = stream
179        .read_session(
180            ReadInput::new()
181                .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
182                .with_stop(ReadStop::new().with_until(..one_hour_ago)),
183        )
184        .await?;
185
186    while let Some(batch) = session.next().await {
187        let batch = batch?;
188        for record in batch.records {
189            println!("[{}] {:?}", record.seq_num, record.body);
190        }
191    }
192    // ANCHOR_END: read-session-until
193
194    // ANCHOR: read-session-wait
195    // Read all available records, and once reaching the current tail, wait an additional 30 seconds
196    // for new ones
197    let mut session = stream
198        .read_session(
199            ReadInput::new()
200                .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
201                .with_stop(ReadStop::new().with_wait(30)),
202        )
203        .await?;
204
205    while let Some(batch) = session.next().await {
206        let batch = batch?;
207        for record in batch.records {
208            println!("[{}] {:?}", record.seq_num, record.body);
209        }
210    }
211    // ANCHOR_END: read-session-wait
212
213    Ok(())
214}

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more