mod dispatcher;
mod router;
mod transmission;
pub use dispatcher::{EventQueue, EventReceiver};
pub use simploxide_core::SimplexVersion;
pub use tokio_tungstenite::{self, tungstenite};
use futures::StreamExt;
use serde::Deserialize;
use simploxide_core::VersionInfo;
use tokio::sync::{oneshot, watch};
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async,
tungstenite::{Message, client::IntoClientRequest as _},
};
use tokio_util::sync::CancellationToken;
use {router::ClientRouter, transmission::Transmitter};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
pub type Event = String;
pub type Response = Event;
pub type Error = Arc<tungstenite::Error>;
pub type Result<T = ()> = ::std::result::Result<T, Error>;
pub type RawEventQueue = EventQueue;
type WsOut =
futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
type ShutdownEmitter = watch::Sender<bool>;
type ShutdownSignal = watch::Receiver<bool>;
static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
#[cfg(feature = "cli")]
pub mod cli;
type RequestId = usize;
fn next_request_id() -> RequestId {
REQUEST_ID.fetch_add(1, Ordering::Relaxed)
}
pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
let connection_request = simplex_daemon_url.into_client_request()?;
let (sockstream, _) = connect_async(connection_request).await?;
let (ws_out, ws_in) = sockstream.split();
let dispatching_cancellator = CancellationToken::new();
let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
let (shutdown_tx, shutdown) = watch::channel(false);
let (client_router, response_router) = router::init(
dispatching_cancellator.clone(),
transmission_interrupter,
shutdown_tx,
);
let tx = transmission::init(ws_out, transmission_interrupted);
let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
Ok((
RawClient {
tx,
router: client_router,
shutdown,
},
event_queue,
))
}
#[derive(Clone)]
pub struct RawClient {
tx: Transmitter,
router: ClientRouter,
shutdown: ShutdownSignal,
}
impl RawClient {
pub async fn send(&self, command: String) -> Result<Response> {
let id = next_request_id();
let (responder, response) = oneshot::channel();
self.router.book(id, responder)?;
self.tx.make_request(id, command)?;
response
.await
.expect("Registered responders always deliver")
}
pub async fn version(&self) -> std::result::Result<SimplexVersion, VersionError> {
#[derive(Deserialize)]
struct VersionResponse<'a> {
#[serde(borrow)]
resp: VersionInfo<'a>,
}
let output = self.send("/v".to_owned()).await?;
let response = serde_json::from_str::<VersionResponse>(&output)
.map_err(VersionError::InvalidJson)?
.resp
.version_info
.version;
let version = response
.parse()
.map_err(|_| VersionError::ParseError(response.to_owned()))?;
Ok(version)
}
pub fn disconnect(mut self) -> impl Future<Output = ()> {
self.router.shutdown();
async move {
let _ = self.shutdown.wait_for(|done| *done).await;
}
}
}
#[derive(Debug)]
pub enum VersionError {
Ws(Error),
InvalidJson(serde_json::Error),
ParseError(String),
}
impl From<Error> for VersionError {
fn from(value: Error) -> Self {
Self::Ws(value)
}
}
impl std::fmt::Display for VersionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ws(e) => e.fmt(f),
Self::InvalidJson(e) => write!(f, "Cannot parse the version json: {e}"),
Self::ParseError(s) => {
write!(
f,
"Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
)
}
}
}
}
impl std::error::Error for VersionError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Ws(e) => Some(e),
Self::InvalidJson(e) => Some(e),
Self::ParseError(_) => None,
}
}
}