surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the actors and constructors for creating, managing, and routing messages to and
//! from TCP connections.
use crate::utils::ping_actor::ping_actor_constructor;
use reader::reader_actor;
use router::router_actor;
use surrealcs_kernel::{
	logging::messages::connections::id::create_id, messages::client::message::TransactionMessage,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
use writer::writer_actor;

pub mod reader;
pub mod recovery_process;
pub mod router;
pub mod writer;

/// Constructs a TCP connection.
///
/// # Notes
/// The `writer_rx` and `writer_tx` are passed into the constructor because the channel needs
/// to be allocated by an allocator in order to get the `connection_index`.
///
/// The constructor spawns actors that must wait to recieve a message from another actor before they can emit a message,
/// and actors that emit messages into the actor system. The actors that emit messages into the actor system are spawned
/// with `connection_id` as an argument so they can start the message loop with an ID for tracing.
///
/// # Arguments
/// * `address`: The address of the server the connection is being made to
/// * `writer_rx`: The receiver for the writer actor
/// * `writer_tx`: The sender for the writer actor
/// * `connection_index`: The index of the connection in the allocator
pub async fn constructor(
	address: String,
	writer_rx: mpsc::Receiver<TransactionMessage>,
	writer_tx: mpsc::Sender<TransactionMessage>,
	connection_index: usize,
) {
	let stream = TcpStream::connect(&address).await.unwrap();
	internal_constructor(address, writer_rx, writer_tx, connection_index, stream).await;
}

/// Constructs a TCP connection.
///
/// # Arguments
/// * `address`: The address of the server the connection is being made to
/// * `writer_rx`: The receiver for the writer actor
/// * `writer_tx`: The sender for the writer actor
/// * `connection_index`: The index of the connection in the allocator
/// * `stream`: The TCP stream
pub async fn internal_constructor(
	address: String,
	writer_rx: mpsc::Receiver<TransactionMessage>,
	writer_tx: mpsc::Sender<TransactionMessage>,
	connection_index: usize,
	stream: TcpStream,
) {
	let (reader, writer) = stream.into_split();
	let address_ref = address.clone();

	// constructs the channel for the router for the connection
	let (r_tx, r_rx) = mpsc::channel::<TransactionMessage>(32);
	let r_tx_ref = r_tx.clone();

	let connection_id = create_id(connection_index);
	let connection_id_ref = connection_id.clone();
	let connection_id_ref_2 = connection_id.clone();
	let connection_id_ref_3 = connection_id.clone();

	// constructs the channel for the writer actor
	let (ping_tx, ping_rx) = mpsc::channel::<TransactionMessage>(32);
	let ping_tx_ref = ping_tx.clone();

	// spawns the actors below that do not emit, but recieve before emitting
	let writer_tx_ref = writer_tx.clone();
	tokio::spawn(async move {
		writer_actor(
			writer,
			writer_rx,
			writer_tx_ref,
			r_tx,
			address_ref,
			connection_id_ref_3,
			ping_tx_ref,
		)
		.await;
	});
	let reader_actor_handle: JoinHandle<()> = tokio::spawn(async move {
		reader_actor(reader, r_tx_ref, connection_id_ref).await;
	});
	tokio::spawn(async move {
		router_actor(r_rx, address, connection_id_ref_2, reader_actor_handle).await;
	});

	// spawns emitting actors (emitting actors have the `connection_index` as an argument because they are )
	ping_actor_constructor(writer_tx, connection_id, (ping_tx, ping_rx)).await.unwrap()
}