surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! The recovery process is when the connection is lost and the client attempts to reconnect to the server. The recovery process is a separate actor that is spawned when the connection is lost. The recovery process will attempt to reconnect to the server until the connection is reestablished. The recovery process will then spawn the connection actors and the ping actor to check if the connection is live. The recovery process will then terminate.
//! from the connection but will be used to try and reconnect to the server. We need the following:
//!
//! - the writer actor realises that there is a connection error when failing to write to the TCP stream
//! - the writer then calls the `run_recovery_process` function which is in this module.
//!
//! ## Run Recovery Process Steps
//! - create the caching actor
//! - create the connection handle
//! - loop until the connection is established
//! - create the reader actor
//! - flush the cache
//! - return the writer part of the TCP stream back to the writer actor
use super::{
	caching::caching_actor,
	connection::{attempt_tcp_connection, ConnectionCreator},
	expodential_backoff::{
		attempt_connection_adapter, ConnectionAttemptResult, ExpodentialBackoffScheduler,
	},
	messages::CacheMessage,
};
use crate::connection::reader::reader_actor;
use nanoservices_utils::errors::NanoServiceError;
use surrealcs_kernel::messages::client::message::TransactionMessage;
use tokio::{
	net::tcp::OwnedWriteHalf,
	sync::mpsc,
	time::{timeout, Duration},
};

/// Coordinates the recovery process when the connection is lost.
///
/// # Notes
/// This is not directly unit tested due to the direct handling of the TCP stream.
///
/// # Arguments
/// * `rx`: The receiver for the transaction messages (will be used to cache messages sent to the connection)
/// * `tx`: The sender for the transaction messages (will be used to send messages to the connection)
/// * `router_tx`: The sender for the router actor
/// * `address`: The address of the server to reconnect to
/// * `connection_id`: The ID of the connection
pub async fn run_recovery_process(
	rx: &mut mpsc::Receiver<TransactionMessage>,
	tx: mpsc::Sender<TransactionMessage>,
	router_tx: mpsc::Sender<TransactionMessage>,
	address: String,
	connection_id: String,
) -> Result<OwnedWriteHalf, NanoServiceError> {
	// create the caching actor
	let (caching_tx, caching_rx) = mpsc::channel::<CacheMessage<TransactionMessage>>(50);
	let _caching_handle = tokio::spawn(async move { caching_actor(caching_rx).await });

	// create the connection handle
	let connection_handle = ConnectionCreator {
		address,
	};
	let mut connection_scheduler = ExpodentialBackoffScheduler::new(connection_handle);

	loop {
		match attempt_connection_adapter(&mut connection_scheduler, attempt_tcp_connection).await {
			ConnectionAttemptResult::Connection(stream) => {
				// create the reader actor
				let (reader, writer) = stream.into_split();
				tokio::spawn(async move { reader_actor(reader, router_tx, connection_id).await });

				// flush the cache
				let message = CacheMessage::Flush(tx);
				// there are no unwraps in the cache, if the
				caching_tx.send(message).await.unwrap();
				return Ok(writer);
			}
			ConnectionAttemptResult::Error(e) => {
				tracing::error!("Error connecting to server: {:?} -> {}", e, connection_id);
			}
			ConnectionAttemptResult::NotAttempted => {
				continue;
			}
		}
		let timeout_duration = Duration::from_millis(50);
		for _ in 0..4 {
			match timeout(timeout_duration, rx.recv()).await {
				Ok(Some(message)) => {
					caching_tx.send(CacheMessage::Put(message)).await.unwrap();
				}
				Ok(None) => {
					continue;
				}
				Err(_) => {
					continue;
				}
			}
		}
	}
}