surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the processes and interfaces for an expodential backoff recovery process.
use super::connection::ConnectionCreator;
use futures::Future;
use nanoservices_utils::errors::NanoServiceError;

/// The result of a connection attempt from the adapter.
///
/// # Fields
/// * `Connection`: The connection to the server (generic for testing purposes)
/// * `Error`: An error that occurred during the connection attempt
/// * `NotAttempted`: The connection was not attempted due to not enough time passing
#[derive(Debug, PartialEq)]
pub enum ConnectionAttemptResult<Y> {
	Connection(Y),
	Error(NanoServiceError),
	NotAttempted,
}

/// The result of yielding the connection handle from the scheduler.
///
/// # Fields
/// * `Yielded`: The connection handle was yielded
/// * `NotYielded`: The connection handle was not yielded due to not enough time passing
#[derive(Debug, PartialEq)]
enum YieldResult<'a> {
	Yielded(&'a ConnectionCreator),
	NotYielded,
}

/// The adapter for attempting a connection to the server with expodential backoff.
///
/// # Notes
/// This is the main interface for expodential backoff recovery process.
///
/// # Arguments
/// * `scheduler`: The scheduler for the expodential backoff
/// * `connection_closure`: The closure that will attempt to connect to the server
///
/// # Returns
/// A `ConnectionAttemptResult` with the result of the connection attempt
pub async fn attempt_connection_adapter<F, Fut, Y>(
	scheduler: &mut ExpodentialBackoffScheduler,
	connection_closure: F,
) -> ConnectionAttemptResult<Y>
where
	F: FnOnce(String) -> Fut + Copy + Send,
	Fut: Future<Output = Result<Y, NanoServiceError>> + Send,
{
	let connection = match scheduler.yield_connection_handle() {
		YieldResult::Yielded(connection_handle) => connection_handle,
		YieldResult::NotYielded => return ConnectionAttemptResult::NotAttempted,
	};
	match connection.attempt_connection(connection_closure).await {
		Ok(stream) => ConnectionAttemptResult::Connection(stream),
		Err(e) => ConnectionAttemptResult::Error(e),
	}
}

/// The scheduler for the expodential backoff recovery process.
///
/// # Fields
/// * `current_miliseconds`: The current miliseconds to wait before attempting to connect
/// * `max_miliseconds`: The maximum miliseconds to wait before attempting to connect (`current_miliseconds` will not exceed this value)
/// * `snapshot_datetime`: The snapshot of the current datetime of the last connection attempt
/// * `connection_handle`: The connection handle to the server
pub struct ExpodentialBackoffScheduler {
	pub current_miliseconds: u64,
	pub max_miliseconds: u64,
	pub snapshot_datetime: chrono::DateTime<chrono::Utc>,
	pub connection_handle: ConnectionCreator,
}

impl ExpodentialBackoffScheduler {
	/// The constructor for the expodential backoff scheduler.
	///
	/// # Arguments
	/// * `connection_handle`: The connection handle to the server that will be used for the connection attempt
	///
	/// # Returns
	/// A new instance of the expodential backoff scheduler
	pub fn new(connection_handle: ConnectionCreator) -> Self {
		Self {
			current_miliseconds: 200,
			max_miliseconds: 5000,
			snapshot_datetime: chrono::Utc::now(),
			connection_handle,
		}
	}

	/// Yields the connection handle if enough time has passed.
	///
	/// # Returns
	/// A `YieldResult` with the result of yielding the connection handle
	fn yield_connection_handle(&mut self) -> YieldResult {
		let now = chrono::Utc::now();
		let duration = now - self.snapshot_datetime;
		let duration_miliseconds = duration.num_milliseconds() as u64;
		if duration_miliseconds >= self.current_miliseconds {
			self.snapshot_datetime = now;
			self.current_miliseconds *= 2;
			if self.current_miliseconds > self.max_miliseconds {
				self.current_miliseconds = self.max_miliseconds;
			}
			return YieldResult::Yielded(&self.connection_handle);
		}
		YieldResult::NotYielded
	}
}

#[cfg(test)]
mod tests {

	use super::*;
	use std::sync::atomic::AtomicUsize;
	use std::sync::atomic::Ordering::Relaxed;

	static CONNECTION_ATTEMPT: AtomicUsize = AtomicUsize::new(0);

	/// To mock the connection attempt
	async fn check_connection(_address: String) -> Result<bool, NanoServiceError> {
		let count = CONNECTION_ATTEMPT.fetch_add(1, Relaxed);
		if count == 2 {
			return Ok(true);
		}
		Err(NanoServiceError::new(
			"Error connecting to server".to_string(),
			nanoservices_utils::errors::NanoServiceErrorStatus::Unknown,
		))
	}

	#[test]
	fn test_increment() {
		let connection_handle = ConnectionCreator {
			address: "fake".to_string(),
		};
		let mut adapter = ExpodentialBackoffScheduler::new(connection_handle);
		let snapshot = adapter.snapshot_datetime;

		let result = adapter.yield_connection_handle();
		assert_eq!(result, YieldResult::NotYielded);
		assert_eq!(snapshot, adapter.snapshot_datetime);

		// sleep for 200ms
		std::thread::sleep(std::time::Duration::from_millis(200));
		let result = adapter.yield_connection_handle();

		match result {
			YieldResult::Yielded(_) => {}
			YieldResult::NotYielded => {
				panic!("Expected to yield connection handle");
			}
		}
		assert_eq!(adapter.current_miliseconds, 400);
		assert_ne!(snapshot, adapter.snapshot_datetime); // snapshot should have been updated
		let snapshot = adapter.snapshot_datetime;

		let result = adapter.yield_connection_handle();
		assert_eq!(result, YieldResult::NotYielded);
		assert_eq!(snapshot, adapter.snapshot_datetime);

		// sleep for 200ms
		std::thread::sleep(std::time::Duration::from_millis(200));
		let result = adapter.yield_connection_handle();
		assert_eq!(result, YieldResult::NotYielded);
		assert_eq!(snapshot, adapter.snapshot_datetime);

		// sleep for 200ms
		std::thread::sleep(std::time::Duration::from_millis(200));
		let result = adapter.yield_connection_handle();
		match result {
			YieldResult::Yielded(_) => {}
			YieldResult::NotYielded => {
				panic!("Expected to yield connection handle");
			}
		}
		assert_eq!(adapter.current_miliseconds, 800);
		assert_ne!(snapshot, adapter.snapshot_datetime); // snapshot should have been updated
	}

	#[test]
	fn test_max_miliseconds() {
		let connection_handle = ConnectionCreator {
			address: "fake".to_string(),
		};
		let mut adapter = ExpodentialBackoffScheduler::new(connection_handle);
		adapter.max_miliseconds = 20;
		adapter.current_miliseconds = 20;
		let snapshot = adapter.snapshot_datetime;

		std::thread::sleep(std::time::Duration::from_millis(20));

		let result = adapter.yield_connection_handle();
		match result {
			YieldResult::Yielded(_) => {}
			YieldResult::NotYielded => {
				panic!("Expected to yield connection handle");
			}
		}
		assert_eq!(adapter.current_miliseconds, 20); // current_miliseconds should not have changed due to max_miliseconds
		assert_ne!(snapshot, adapter.snapshot_datetime); // snapshot should have been updated

		std::thread::sleep(std::time::Duration::from_millis(20));

		let result = adapter.yield_connection_handle();
		match result {
			YieldResult::Yielded(_) => {}
			YieldResult::NotYielded => {
				panic!("Expected to yield connection handle");
			}
		}
		assert_eq!(adapter.current_miliseconds, 20); // current_miliseconds should not have changed due to max_miliseconds
		assert_ne!(snapshot, adapter.snapshot_datetime); // snapshot should have been updated
	}

	#[tokio::test]
	async fn test_adapter() {
		let connector = ConnectionCreator {
			address: "fake".to_string(),
		};
		let mut scheduler = ExpodentialBackoffScheduler::new(connector);

		let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;

		match outcome {
			ConnectionAttemptResult::Connection(_) => {
				panic!("Expected to not connect to server");
			}
			ConnectionAttemptResult::Error(_) => {
				panic!("Expected to connect to server");
			}
			ConnectionAttemptResult::NotAttempted => {}
		}
		// assert that the connection attempt was not made
		assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 0);

		// sleep for 200ms
		std::thread::sleep(std::time::Duration::from_millis(200));

		let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;

		match outcome {
			ConnectionAttemptResult::Connection(_) => {
				panic!("Expected to not connect to server");
			}
			ConnectionAttemptResult::Error(_) => {}
			ConnectionAttemptResult::NotAttempted => {
				panic!("Expected to attempt connect to server");
			}
		}
		// assert that the connection attempt was made
		assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 1);

		// sleep for 400ms
		std::thread::sleep(std::time::Duration::from_millis(400));

		let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;

		match outcome {
			ConnectionAttemptResult::Connection(_) => {
				panic!("Expected to not connect to server");
			}
			ConnectionAttemptResult::Error(_) => {}
			ConnectionAttemptResult::NotAttempted => {
				panic!("Expected to attempt connect to server");
			}
		}

		// assert that the connection attempt was made
		assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 2);

		// sleep for 800ms
		std::thread::sleep(std::time::Duration::from_millis(800));

		let outcome = attempt_connection_adapter(&mut scheduler, check_connection).await;
		match outcome {
			ConnectionAttemptResult::Connection(_) => {}
			ConnectionAttemptResult::Error(_) => {
				panic!("Expected to connect to server");
			}
			ConnectionAttemptResult::NotAttempted => {
				panic!("Expected to attempt connect to server");
			}
		}
		// assert that the connection attempt was made
		assert_eq!(CONNECTION_ATTEMPT.load(Relaxed), 3);
	}
}