use crate::utils::ping_actor::ping_actor_constructor;
use reader::reader_actor;
use router::router_actor;
use surrealcs_kernel::{
logging::messages::connections::id::create_id, messages::client::message::TransactionMessage,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
use writer::writer_actor;
pub mod reader;
pub mod recovery_process;
pub mod router;
pub mod writer;
pub async fn constructor(
address: String,
writer_rx: mpsc::Receiver<TransactionMessage>,
writer_tx: mpsc::Sender<TransactionMessage>,
connection_index: usize,
) {
let stream = TcpStream::connect(&address).await.unwrap();
internal_constructor(address, writer_rx, writer_tx, connection_index, stream).await;
}
pub async fn internal_constructor(
address: String,
writer_rx: mpsc::Receiver<TransactionMessage>,
writer_tx: mpsc::Sender<TransactionMessage>,
connection_index: usize,
stream: TcpStream,
) {
let (reader, writer) = stream.into_split();
let address_ref = address.clone();
let (r_tx, r_rx) = mpsc::channel::<TransactionMessage>(32);
let r_tx_ref = r_tx.clone();
let connection_id = create_id(connection_index);
let connection_id_ref = connection_id.clone();
let connection_id_ref_2 = connection_id.clone();
let connection_id_ref_3 = connection_id.clone();
let (ping_tx, ping_rx) = mpsc::channel::<TransactionMessage>(32);
let ping_tx_ref = ping_tx.clone();
let writer_tx_ref = writer_tx.clone();
tokio::spawn(async move {
writer_actor(
writer,
writer_rx,
writer_tx_ref,
r_tx,
address_ref,
connection_id_ref_3,
ping_tx_ref,
)
.await;
});
let reader_actor_handle: JoinHandle<()> = tokio::spawn(async move {
reader_actor(reader, r_tx_ref, connection_id_ref).await;
});
tokio::spawn(async move {
router_actor(r_rx, address, connection_id_ref_2, reader_actor_handle).await;
});
ping_actor_constructor(writer_tx, connection_id, (ping_tx, ping_rx)).await.unwrap()
}