dynamo_subscriber/stream/
mod.rs

1//! # DynamodbStream
2//!
3//! [`DynamodbStream`](crate::stream::DynamodbStream) represents Amazon DynamoDB Streams and you can receive records from it by
4//! using it as a [tokio stream](https://docs.rs/tokio-stream/0.1.14/tokio_stream/index.html).
5//!
6//! ```rust,no_run
7//! # use aws_config::BehaviorVersion;
8//! use dynamo_subscriber as subscriber;
9//! use tokio_stream::StreamExt;
10//!
11//! # async fn wrapper() {
12//! # let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
13//! # let client = subscriber::Client::new(&config);
14//! // Create a stream from builder.
15//! let mut stream = subscriber::stream::builder()
16//!     .client(client)
17//!     .table_name("People")
18//!     .build();
19//!
20//! // Subscribe the stream to receive DynamoDB Streams records.
21//! while let Some(records) = stream.next().await {
22//!     println!("{:#?}", records);
23//! }
24//! # }
25//! ```
26//!
27//! ## Stop polling the DynamoDB table
28//!
29//! Once you construct a stream, it polls the DynamoDB table and receives records permanently.
30//! If you want to stop polling, extract a communication channel to the polling half of the
31//! stream, and then send `Stop polling` event from the channel.
32//!
33//! ```rust,no_run
34//! # use aws_config::BehaviorVersion;
35//! use dynamo_subscriber as subscriber;
36//! use tokio_stream::StreamExt;
37//!
38//! # async fn wrapper() {
39//! # let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
40//! # let client = subscriber::Client::new(&config);
41//! // Create a stream from builder.
42//! let mut stream = subscriber::stream::builder()
43//!     .client(client)
44//!     .table_name("People")
45//!     .build();
46//!
47//! // Extract a communication channel from the stream.
48//! let mut channel = stream.take_channel().unwrap();
49//!
50//! // Receive records from the stream.
51//! let records = stream.next().await;
52//! println!("{:#?}", records);
53//!
54//! // Send `Stop polling` event from the communication channel.
55//! channel.close(|| {});
56//!
57//! // Now the stream is closed and no more records can be sent to the stream.
58//! let records = stream.next().await;
59//! assert!(records.is_none());
60//! # }
61//! ```
62
63mod channel;
64mod dynamodb;
65
66use super::{client::DynamodbClient, error::Error, types};
67
68pub use channel::ConsumerChannel;
69pub use dynamodb::{DynamodbStream, DynamodbStreamBuilder};
70
71/// Create [`DynamodbStreamBuilder`].
72pub fn builder<C: DynamodbClient + 'static>() -> DynamodbStreamBuilder<C> {
73    DynamodbStreamBuilder::new()
74}