use super::{
caching::caching_actor,
connection::{attempt_tcp_connection, ConnectionCreator},
expodential_backoff::{
attempt_connection_adapter, ConnectionAttemptResult, ExpodentialBackoffScheduler,
},
messages::CacheMessage,
};
use crate::connection::reader::reader_actor;
use nanoservices_utils::errors::NanoServiceError;
use surrealcs_kernel::messages::client::message::TransactionMessage;
use tokio::{
net::tcp::OwnedWriteHalf,
sync::mpsc,
time::{timeout, Duration},
};
pub async fn run_recovery_process(
rx: &mut mpsc::Receiver<TransactionMessage>,
tx: mpsc::Sender<TransactionMessage>,
router_tx: mpsc::Sender<TransactionMessage>,
address: String,
connection_id: String,
) -> Result<OwnedWriteHalf, NanoServiceError> {
let (caching_tx, caching_rx) = mpsc::channel::<CacheMessage<TransactionMessage>>(50);
let _caching_handle = tokio::spawn(async move { caching_actor(caching_rx).await });
let connection_handle = ConnectionCreator {
address,
};
let mut connection_scheduler = ExpodentialBackoffScheduler::new(connection_handle);
loop {
match attempt_connection_adapter(&mut connection_scheduler, attempt_tcp_connection).await {
ConnectionAttemptResult::Connection(stream) => {
let (reader, writer) = stream.into_split();
tokio::spawn(async move { reader_actor(reader, router_tx, connection_id).await });
let message = CacheMessage::Flush(tx);
caching_tx.send(message).await.unwrap();
return Ok(writer);
}
ConnectionAttemptResult::Error(e) => {
tracing::error!("Error connecting to server: {:?} -> {}", e, connection_id);
}
ConnectionAttemptResult::NotAttempted => {
continue;
}
}
let timeout_duration = Duration::from_millis(50);
for _ in 0..4 {
match timeout(timeout_duration, rx.recv()).await {
Ok(Some(message)) => {
caching_tx.send(CacheMessage::Put(message)).await.unwrap();
}
Ok(None) => {
continue;
}
Err(_) => {
continue;
}
}
}
}
}