use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
use ydb::{
ydb_params, ClientBuilder, ConsumerBuilder, CreateTopicOptionsBuilder,
DescribeTopicOptionsBuilder, Query, TopicWriterMessageBuilder, TopicWriterOptionsBuilder,
YdbError, YdbResult,
};
async fn setup_environment(client: &ydb::Client) -> YdbResult<()> {
let table_client = client.table_client();
let _ = table_client
.retry_execute_scheme_query("DROP TABLE topic_offset_storage")
.await;
table_client
.retry_execute_scheme_query(
"CREATE TABLE topic_offset_storage (
topic Text NOT NULL, -- Topic name for multi-topic scenarios
partition Int64 NOT NULL, -- Partition ID for parallel processing
offset Int64 NOT NULL, -- Message offset (unique per partition)
body Text, -- Message content for verification
PRIMARY KEY(topic, partition, offset)
)",
)
.await?;
println!("Table 'topic_offset_storage' created successfully");
let database_path = client.database();
let topic_name = "test_topic";
let topic_path = format!("{database_path}/{topic_name}");
let consumer_name = "test_consumer";
let mut topic_client = client.topic_client();
let _ = topic_client.drop_topic(topic_path.clone()).await;
'wait_topic_dropped: loop {
let mut scheme = client.scheme_client();
let res = scheme.list_directory(database_path.clone()).await?;
let mut topic_exists = false;
for item in res.into_iter() {
if item.name == topic_name {
topic_exists = true;
break;
}
}
if !topic_exists {
break 'wait_topic_dropped;
}
println!("Waiting for previous topic to be dropped...");
tokio::time::sleep(Duration::from_millis(100)).await;
}
topic_client
.create_topic(
topic_path.clone(),
CreateTopicOptionsBuilder::default()
.consumers(vec![ConsumerBuilder::default()
.name(consumer_name.to_string())
.build()?])
.build()?,
)
.await?;
println!("Topic '{topic_name}' created successfully");
let producer_id = "test-producer";
let mut writer = topic_client
.create_writer_with_params(
TopicWriterOptionsBuilder::default()
.auto_seq_no(false) .topic_path(topic_path.clone())
.producer_id(producer_id.to_string())
.build()?,
)
.await?;
println!("Topic writer created successfully");
writer
.write_with_ack(
TopicWriterMessageBuilder::default()
.seq_no(Some(1))
.data("Message 1: Setup environment test".as_bytes().into())
.build()?,
)
.await?;
println!("Message 1 sent and confirmed");
writer
.write_with_ack(
TopicWriterMessageBuilder::default()
.seq_no(Some(2))
.data("Message 2: Table and topic ready".as_bytes().into())
.build()?,
)
.await?;
println!("Message 2 sent and confirmed");
writer
.write_with_ack(
TopicWriterMessageBuilder::default()
.seq_no(Some(3))
.data("Message 3: Environment setup complete".as_bytes().into())
.build()?,
)
.await?;
println!("Message 3 sent and confirmed");
writer.stop().await?;
println!("All messages published successfully");
Ok(())
}
#[tokio::main]
async fn main() -> YdbResult<()> {
println!("Starting topic read in transaction example...");
let client = ClientBuilder::new_from_connection_string("grpc://localhost:2136?database=local")?
.client()?;
if let Ok(res) = timeout(Duration::from_secs(3), client.wait()).await {
res?
} else {
return Err(YdbError::from("Connection timeout"));
};
println!("Connected to database successfully");
println!("\n=== STEP 1: ENVIRONMENT SETUP ===");
setup_environment(&client).await?;
println!("✅ Environment setup completed successfully");
println!("\n=== STEP 2: TRANSACTIONAL MESSAGE PROCESSING ===");
let database_path = client.database();
let topic_name = "test_topic";
let topic_path = format!("{database_path}/{topic_name}");
let consumer_name = "test_consumer";
let mut topic_client = client.topic_client();
let table_client = client.table_client();
let reader = topic_client
.create_reader(consumer_name.to_string(), topic_path.clone())
.await?;
let reader_mutex = Arc::new(Mutex::new(reader));
println!("Topic reader created successfully");
let mut iteration = 0;
loop {
iteration += 1;
println!("Iteration {iteration}: Starting transaction...");
let result = table_client
.retry_transaction(|mut t| {
let reader_mutex = reader_mutex.clone();
async move {
let mut reader_guard = reader_mutex.lock().await;
let batch_result = timeout(
Duration::from_secs(3),
reader_guard.pop_batch_in_tx(&mut t)
).await;
match batch_result {
Ok(Ok(batch)) => {
println!(" Read batch with {} messages", batch.messages.len());
for mut message in batch.messages {
let topic = message.get_topic().to_string();
let partition_id = message.get_partition_id();
let offset = message.offset;
let message_body = message.read_and_take().await?.unwrap_or_default();
let body_str = String::from_utf8_lossy(&message_body).to_string();
t.query(
Query::new(
"
DECLARE $topic AS Text;
DECLARE $partition AS Int64;
DECLARE $offset AS Int64;
DECLARE $body AS Text;
INSERT INTO topic_offset_storage (topic, partition, offset, body)
VALUES ($topic, $partition, $offset, $body)
"
)
.with_params(ydb_params!(
"$topic" => topic.clone(),
"$partition" => partition_id,
"$offset" => offset,
"$body" => body_str.clone()
))
).await?;
println!(" Stored message: topic={}, partition={}, offset={}, body_len={}",
topic, partition_id, offset, body_str.len());
}
t.commit().await?;
println!(" Transaction committed successfully");
Ok(true) }
Ok(Err(err)) => {
println!(" Error reading batch: {err}");
Err(ydb::YdbOrCustomerError::YDB(err))
}
Err(_timeout_err) => {
println!(" Timeout reading batch - no more messages available");
Ok(false) }
}
}
})
.await;
match result {
Ok(true) => {
continue;
}
Ok(false) => {
println!("All messages have been read and stored");
break;
}
Err(err) => {
println!("Transaction failed: {err}");
return Err(ydb::YdbOrCustomerError::to_ydb_error(err));
}
}
}
println!("✅ Transactional reading completed successfully after {iteration} iterations");
println!("\n=== STEP 3: TABLE READING AND VERIFICATION ===");
#[derive(Debug, Clone)]
struct TableRow {
topic: String,
partition: i64,
offset: i64,
body: String,
}
let table_data = table_client
.retry_transaction(|mut t| {
async move {
let result_set = t
.query(Query::new(
"SELECT topic, partition, offset, body
FROM topic_offset_storage
ORDER BY topic, partition, offset",
))
.await?
.into_only_result()?;
let mut rows = Vec::new();
for mut row in result_set.rows() {
let topic: String = row.remove_field_by_name("topic")?.try_into()?;
let partition: i64 = row.remove_field_by_name("partition")?.try_into()?;
let offset: i64 = row.remove_field_by_name("offset")?.try_into()?;
let body: Option<String> = row.remove_field_by_name("body")?.try_into()?;
rows.push(TableRow {
topic,
partition,
offset,
body: body.unwrap_or_default(),
});
}
t.commit().await?;
Ok(rows)
}
})
.await;
match table_data {
Ok(rows) => {
println!("Table contents:");
println!("+-----------------------+-----------+--------+--------------------------------------------------+");
println!("| Topic | Partition | Offset | Body |");
println!("+-----------------------+-----------+--------+--------------------------------------------------+");
for row in &rows {
println!(
"| {:21} | {:9} | {:6} | {:48} |",
row.topic, row.partition, row.offset, row.body
);
}
println!("+-----------------------+-----------+--------+--------------------------------------------------+");
println!("Total messages in table: {}", rows.len());
println!("All messages have been successfully processed and stored in the table");
println!(
"Table reading completed successfully, {} rows retrieved",
rows.len()
);
}
Err(err) => {
println!("❌ Table reading transaction failed: {err}");
return Err(ydb::YdbOrCustomerError::to_ydb_error(err));
}
}
println!("\n=== STEP 4: TOPIC STATUS VERIFICATION ===");
match topic_client
.describe_topic(
topic_path.clone(),
DescribeTopicOptionsBuilder::default()
.include_stats(true) .build()?,
)
.await
{
Ok(topic_description) => {
println!("=== Topic Status ===");
println!("Topic '{topic_name}':");
let mut total_messages = 0;
let mut last_offset = -1;
for partition in &topic_description.partitions {
if let Some(stats) = &partition.stats {
let partition_messages = stats.end_offset - stats.start_offset;
total_messages += partition_messages;
if stats.end_offset > last_offset + 1 {
last_offset = stats.end_offset - 1; }
}
}
println!(" Total messages: {total_messages}");
println!(" Committed messages: {total_messages}"); if last_offset >= 0 {
println!(" Last offset: {last_offset}");
println!(" Last committed offset: {last_offset}");
}
println!(" Partitions: {}", topic_description.partitions.len());
for partition in &topic_description.partitions {
println!(
" Partition {}: Active={}",
partition.partition_id, partition.active
);
if let Some(stats) = &partition.stats {
println!(
" Offset range: {} to {}",
stats.start_offset,
stats.end_offset - 1
);
println!(
" Messages in partition: {}",
stats.end_offset - stats.start_offset
);
}
}
println!(" Consumers: {}", topic_description.consumers.len());
for consumer in &topic_description.consumers {
println!(" Consumer: {}", consumer.name);
if !consumer.supported_codecs.is_empty() {
println!(
" Supported codecs: {} codecs",
consumer.supported_codecs.len()
);
}
}
println!("Topic status retrieved successfully");
}
Err(err) => {
println!("Failed to get topic status: {err}");
}
}
println!("\n=== WORKFLOW COMPLETED SUCCESSFULLY ===");
println!("Summary:");
println!(" ✅ Environment setup completed");
println!(" ✅ Messages published to topic");
println!(" ✅ Messages read and stored in table via transactions");
println!(" ✅ Table contents verified");
println!(" ✅ Topic status checked");
println!("\n📚 Key Learning Points:");
println!(" • Each message batch was processed in its own transaction");
println!(" • Timeout on message reading is normal (indicates no more data)");
println!(" • Transactions prevent duplicate processing during retries");
println!(" • Primary key (topic, partition, offset) is the unique message identifier");
println!(" • Data display happens outside transactions for better performance");
println!(" • Topic statistics help verify processing correctness");
Ok(())
}