#![allow(clippy::mutable_key_type)]
use bytes_utils::Str;
use fred::{prelude::*, types::streams::XReadResponse};
use futures::future::try_join_all;
use std::time::Duration;
use tokio::time::sleep;
static VALUES: &[&str] = &["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let reader_task = tokio::spawn(async move {
let client = Builder::default_centralized()
.with_config(|config| {
config.password = Some("bar".into());
})
.build()?;
client.init().await?;
let _: () = client.del("foo").await?;
let _: () = client.xgroup_create("foo", "group", "$", true).await?;
let mut count = 0;
loop {
let entry: XReadResponse<Str, Str, Str, Str> = client.xread_map(Some(1), Some(10_000), "foo", "$").await?;
count += 1;
for (key, records) in entry.into_iter() {
for (id, fields) in records.into_iter() {
println!("Reader recv {} - {}: {:?}", key, id, fields);
}
}
if count * 2 >= VALUES.len() {
break;
}
}
client.quit().await?;
Ok::<_, Error>(())
});
let writer_task = tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
let client = Builder::default_centralized()
.with_config(|config| {
config.password = Some("bar".into());
})
.build()?;
client.init().await?;
for values in VALUES.chunks(2) {
let id: Str = client
.xadd("foo", false, None, "*", vec![
("field1", values[0]),
("field2", values[1]),
])
.await?;
println!("Writer added stream entry with ID: {}", id);
sleep(Duration::from_secs(1)).await;
}
client.quit().await?;
Ok::<_, Error>(())
});
try_join_all([writer_task, reader_task]).await.unwrap();
}