#![allow(clippy::disallowed_names)]
#![allow(clippy::let_underscore_future)]
use fred::{prelude::*, types::config::UnresponsiveConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Error> {
let client = Builder::default_centralized()
.with_performance_config(|config| {
config.max_feed_count = 100;
config.broadcast_channel_capacity = 48;
})
.with_connection_config(|config| {
config.tcp = TcpConfig {
nodelay: Some(true),
..Default::default()
};
config.max_command_attempts = 5;
config.max_redirections = 5;
config.internal_command_timeout = Duration::from_secs(2);
config.connection_timeout = Duration::from_secs(10);
config.unresponsive = UnresponsiveConfig {
max_timeout: Some(Duration::from_secs(10)),
interval: Duration::from_secs(3)
};
config.auto_client_setname = true;
config.reconnect_on_auth_error = true;
})
.set_policy(ReconnectPolicy::new_exponential(0, 100, 30_000, 2))
.build()?;
client.init().await?;
let _events_task = client.on_any(
|error| async move {
println!("Connection error: {:?}", error);
Ok(())
},
|server| async move {
println!("Reconnected to {:?}", server);
Ok(())
},
|changes| async move {
println!("Cluster changed: {:?}", changes);
Ok(())
},
);
let mut perf_config = client.perf_config();
perf_config.max_feed_count = 1000;
client.update_perf_config(perf_config);
let options = Options {
max_attempts: Some(5),
max_redirections: Some(5),
timeout: Some(Duration::from_secs(10)),
..Default::default()
};
let _: Option<String> = client.with_options(&options).get("foo").await?;
let pipeline = client.pipeline().with_options(&options);
let _: () = pipeline.get("foo").await?;
let _: () = pipeline.get("bar").await?;
let (_, _): (Option<i64>, Option<i64>) = pipeline.all().await?;
let pipeline = client.pipeline();
let _: () = pipeline.incr("foo").await?;
let _: () = pipeline.incr("foo").await?;
assert_eq!(pipeline.last::<i64>().await?, 2);
assert_eq!(pipeline.last::<i64>().await?, 4);
assert_eq!(pipeline.last::<i64>().await?, 6);
if client.is_clustered() {
let _connections = client.active_connections();
let connections = client
.cached_cluster_state()
.map(|state| state.unique_primary_nodes())
.unwrap_or_default();
for server in connections.into_iter() {
let info: String = client.with_cluster_node(&server).client_info().await?;
println!("Client info for {}: {}", server, info);
}
}
println!(
"{:?}",
client
.xreadgroup::<Value, _, _, _, _>("foo", "bar", None, None, false, "baz", ">")
.await?
);
client.quit().await?;
Ok(())
}