use super::connection::ConnectionCreator;
use futures::Future;
use nanoservices_utils::errors::NanoServiceError;
#[derive(Debug, PartialEq)]
pub enum ConnectionAttemptResult<Y> {
Connection(Y),
Error(NanoServiceError),
NotAttempted,
}
#[derive(Debug, PartialEq)]
enum YieldResult<'a> {
Yielded(&'a ConnectionCreator),
NotYielded,
}
pub async fn attempt_connection_adapter<F, Fut, Y>(
scheduler: &mut ExpodentialBackoffScheduler,
connection_closure: F,
) -> ConnectionAttemptResult<Y>
where
F: FnOnce(String) -> Fut + Copy + Send,
Fut: Future<Output = Result<Y, NanoServiceError>> + Send,
{
let connection = match scheduler.yield_connection_handle() {
YieldResult::Yielded(connection_handle) => connection_handle,
YieldResult::NotYielded => return ConnectionAttemptResult::NotAttempted,
};
match connection.attempt_connection(connection_closure).await {
Ok(stream) => ConnectionAttemptResult::Connection(stream),
Err(e) => ConnectionAttemptResult::Error(e),
}
}
pub struct ExpodentialBackoffScheduler {
pub current_miliseconds: u64,
pub max_miliseconds: u64,
pub snapshot_datetime: chrono::DateTime<chrono::Utc>,
pub connection_handle: ConnectionCreator,
}
impl ExpodentialBackoffScheduler {
pub fn new(connection_handle: ConnectionCreator) -> Self {
Self {
current_miliseconds: 200,
max_miliseconds: 5000,
snapshot_datetime: chrono::Utc::now(),
connection_handle,
}
}
fn yield_connection_handle(&mut self) -> YieldResult {
let now = chrono::Utc::now();
let duration = now - self.snapshot_datetime;
let duration_miliseconds = duration.num_milliseconds() as u64;
if duration_miliseconds >= self.current_miliseconds {
self.snapshot_datetime = now;
self.current_miliseconds *= 2;
if self.current_miliseconds > self.max_miliseconds {
self.current_miliseconds = self.max_miliseconds;
}
return YieldResult::Yielded(&self.connection_handle);
}
YieldResult::NotYielded
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
static CONNECTION_ATTEMPT: AtomicUsize = AtomicUsize::new(0);
async fn check_connection(_address: String) -> Result<bool, NanoServiceError> {
let count = CONNECTION_ATTEMPT.fetch_add(1, Relaxed);
if count == 2 {
return Ok(true);
}
Err(NanoServiceError::new(
"Error connecting to server".to_string(),
nanoservices_utils::errors::NanoServiceErrorStatus::Unknown,
))
}
#[test]
fn test_increment() {
let connection_handle = ConnectionCreator {
address: "fake".to_string(),
};
let mut adapter = ExpodentialBackoffScheduler::new(connection_handle);
let snapshot = adapter.snapshot_datetime;
let result = adapter.yield_connection_handle();
assert_eq!(result, YieldResult::NotYielded);
assert_eq!(snapshot, adapter.snapshot_datetime);
std::thread::sleep(std::time::Duration::from_millis(200));
let result = adapter.yield_connection_handle();
match result {
YieldResult::Yielded(_) => {}
YieldResult::NotYielded => {
panic!("Expected to yield connection handle");
}
}
assert_eq!(adapter.current_miliseconds, 400);
assert_ne!(snapshot, adapter.snapshot_datetime); let snapshot = adapter.snapshot_datetime;
let result = adapter.yield_connection_handle();
assert_eq!(result, YieldResult::NotYielded);
assert_eq!(snapshot, adapter.snapshot_datetime);
std::thread::sleep(std::time::Duration::from_millis(200));
let result = adapter.yield_connection_handle();
assert_eq!(result, YieldResult::NotYielded);
assert_eq!(snapshot, adapter.snapshot_datetime);
std::thread::sleep(std::time::Duration::from_millis(200));
let result = adapter.yield_connection_handle();
match result {
YieldResult::Yielded(_) => {}
YieldResult::NotYielded => {
panic!("Expected to yield connection handle");
}
}
assert_eq!(adapter.current_miliseconds, 800);
assert_ne!(snapshot, adapter.snapshot_datetime); }
#[test]
fn test_max_miliseconds() {
let connection_handle = ConnectionCreator {
address: "fake".to_string(),
};
let mut adapter = ExpodentialBackoffScheduler::new(connection_handle);
adapter.max_miliseconds = 20;
adapter.current_miliseconds = 20;
let snapshot = adapter.snapshot_datetime;
std::thread::sleep(std::time::Duration::from_millis(20));
let result = adapter.yield_connection_handle();
match result {
YieldResult::Yielded(_) => {}
YieldResult::NotYielded => {
panic!("Expected to yield connection handle");
}
}
assert_eq!(adapter.current_miliseconds, 20); assert_ne!(snapshot, adapter.snapshot_datetime);
std::thread::sleep(std::time::Duration::from_millis(20));
let result = adapter.yield_connection_handle();
match result {
YieldResult::Yielded(_) => {}
YieldResult::NotYielded => {
panic!("Expected to yield connection handle");
}
}
assert_eq!(adapter.current_miliseconds, 20); assert_ne!(snapshot, adapter.snapshot_datetime); }
#[tokio::test]
async fn test_adapter() {
let connector = ConnectionCreator {
address: "fake".to_string(),
};
let mut scheduler = ExpodentialBackoffScheduler::new(connector);
let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;
match outcome {
ConnectionAttemptResult::Connection(_) => {
panic!("Expected to not connect to server");
}
ConnectionAttemptResult::Error(_) => {
panic!("Expected to connect to server");
}
ConnectionAttemptResult::NotAttempted => {}
}
assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 0);
std::thread::sleep(std::time::Duration::from_millis(200));
let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;
match outcome {
ConnectionAttemptResult::Connection(_) => {
panic!("Expected to not connect to server");
}
ConnectionAttemptResult::Error(_) => {}
ConnectionAttemptResult::NotAttempted => {
panic!("Expected to attempt connect to server");
}
}
assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 1);
std::thread::sleep(std::time::Duration::from_millis(400));
let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;
match outcome {
ConnectionAttemptResult::Connection(_) => {
panic!("Expected to not connect to server");
}
ConnectionAttemptResult::Error(_) => {}
ConnectionAttemptResult::NotAttempted => {
panic!("Expected to attempt connect to server");
}
}
assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 2);
std::thread::sleep(std::time::Duration::from_millis(800));
let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;
match outcome {
ConnectionAttemptResult::Connection(_) => {}
ConnectionAttemptResult::Error(_) => {
panic!("Expected to connect to server");
}
ConnectionAttemptResult::NotAttempted => {
panic!("Expected to attempt connect to server");
}
}
assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 3);
}
}