qos_core 0.6.0

Core components and logic for QuorumOS applications
Documentation
//! Streaming socket based client to connect with
//! [`crate::server::SocketServer`].

use std::time::Duration;

use crate::io::{IOError, SharedStreamPool, SocketAddress, StreamPool};

/// Enclave client error.
#[derive(Debug)]
pub enum ClientError {
	/// [`io::IOError`] wrapper.
	IOError(IOError),
	/// `borsh::io::Error` wrapper.
	BorshError(borsh::io::Error),
	/// Invalid enclave response to a request (e.g. mismatch)
	ResponseMismatch,
}

impl From<IOError> for ClientError {
	fn from(err: IOError) -> Self {
		Self::IOError(err)
	}
}

impl From<borsh::io::Error> for ClientError {
	fn from(err: borsh::io::Error) -> Self {
		Self::BorshError(err)
	}
}
/// Client for communicating with the enclave `crate::server::SocketServer`.
#[derive(Clone, Debug)]
pub struct SocketClient {
	pool: SharedStreamPool,
	timeout: Duration,
}

impl SocketClient {
	/// Create a new client with given `StreamPool`.
	#[must_use]
	pub fn new(pool: SharedStreamPool, timeout: Duration) -> Self {
		Self { pool, timeout }
	}

	/// Create a new client from a single `SocketAddress`. This creates an implicit single socket `StreamPool`.
	pub fn single(
		addr: SocketAddress,
		timeout: Duration,
	) -> Result<Self, IOError> {
		let pool = StreamPool::new(addr, 1)?.shared();

		Ok(Self { pool, timeout })
	}

	/// Returns true if all callable streams are currently exhausted
	pub async fn busy(&self) -> bool {
		self.pool.read().await.busy().await
	}

	/// Send raw bytes and wait for a response until the clients configured
	/// timeout.
	pub async fn call(&self, request: &[u8]) -> Result<Vec<u8>, ClientError> {
		let pool = self.pool.read().await;

		// timeout should apply to the entire operation
		let timeout_result = tokio::time::timeout(self.timeout, async {
			let mut stream = pool.get().await;
			stream.call(request).await
		})
		.await;

		let resp = match timeout_result {
			Ok(result) => result?,
			Err(_err) => return Err(IOError::RecvTimeout.into()),
		};

		Ok(resp)
	}

	/// Sets the client's timeout value
	pub fn set_timeout(&mut self, timeout: Duration) {
		self.timeout = timeout;
	}

	/// Expands the underlying `AsyncPool` to given `pool_size`
	pub async fn expand_to(
		&mut self,
		pool_size: u8,
	) -> Result<(), ClientError> {
		self.pool.write().await.expand_to(pool_size)?;

		Ok(())
	}

	/// Attempt a one-off connection, used for tests
	pub async fn try_connect(&self) -> Result<(), IOError> {
		let pool = self.pool.read().await;
		let mut stream = pool.get().await;

		stream.connect().await
	}
}