#![allow(clippy::disallowed_names)]
use async_trait::async_trait;
use fred::{
prelude::*,
types::{
config::{DynamicPoolConfig, PoolScale},
stats::PoolStats,
},
};
use log::{debug, warn};
use parking_lot::Mutex;
use std::{ops::Add, sync::Arc, time::Duration};
use tokio::time::sleep;
const SAMPLE_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Debug, Default)]
struct ScalePolicy {
last_avg_latency: Mutex<Option<f64>>,
}
impl ScalePolicy {
pub fn latency_increased(&self, usage: &PoolStats) -> bool {
let latency_avg_sum = usage
.clients
.iter()
.fold(0.0, |sum, (_, stats)| sum + stats.network_latency.avg);
let latency_avg = latency_avg_sum / usage.clients.len() as f64;
let mut guard = self.last_avg_latency.lock();
let should_scale_up = if let Some(old_latency_avg) = guard.as_ref() {
*old_latency_avg > 0.0 && latency_avg > old_latency_avg * 1.5
} else {
false
};
guard.replace(latency_avg);
should_scale_up
}
pub fn sent_gt_1000_commands(&self, stats: &PoolStats) -> bool {
let total_commands = stats
.clients
.iter()
.fold(0, |sum, (_, stats)| sum + stats.network_latency.samples);
total_commands >= 1000
}
pub fn sent_lt_100_commands(&self, stats: &PoolStats) -> bool {
let total_commands = stats
.clients
.iter()
.fold(0, |sum, (_, stats)| sum + stats.network_latency.samples);
total_commands < 100
}
}
#[async_trait]
impl PoolScale for ScalePolicy {
fn scale(&self, usage: PoolStats) -> i64 {
if self.latency_increased(&usage) || self.sent_gt_1000_commands(&usage) {
1
} else if self.sent_lt_100_commands(&usage) {
-1
} else {
0
}
}
async fn on_added(&self, clients: Vec<Client>) {
debug!("Added {} client(s)", clients.len());
for client in clients.into_iter() {
client.on_error(|(error, server)| async move {
println!("Client {:?} disconnected with error: {:?}", server, error);
Ok(())
});
}
}
async fn on_failure(&self, error: Error) {
warn!("Failed to add client to pool: {:?}", error);
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
pretty_env_logger::init();
let config = Config::from_url("redis://foo:bar@redis-main:6379")?;
let pool_config = DynamicPoolConfig {
min_clients: 2,
max_clients: 20,
max_idle_time: Duration::from_secs(5 * 60),
scale: Arc::new(ScalePolicy::default()),
#[cfg(feature = "dns")]
resolver: None,
};
let pool = Builder::from_config(config)
.set_pool_config(pool_config)
.build_dynamic_pool()?;
pool.init().await?;
pool.start_scale_task(SAMPLE_INTERVAL);
for _ in 0 .. 1001 {
let _: () = pool.next().incr("foo").await?;
}
sleep(SAMPLE_INTERVAL.add(Duration::from_secs(1))).await;
assert_eq!(pool.size(), 3);
for _ in 0 .. 42 {
let _: () = pool.next().incr("foo").await?;
}
sleep(SAMPLE_INTERVAL.add(Duration::from_secs(1))).await;
assert_eq!(pool.size(), 2);
pool.scale(1).await;
pool.scale(-1).await;
pool.quit().await?;
Ok(())
}