use futures::{
future::BoxFuture,
stream::{Stream, StreamExt},
};
mod typed;
pub use typed::{MatchDispatch, MatchDispatchFrom, TypeNotification};
pub fn json_cast<N, M>(params: N) -> Result<M, crate::Error>
where
N: serde::Serialize,
M: serde::de::DeserializeOwned,
{
let json = serde_json::to_value(params).map_err(|e| {
crate::Error::parse_error().data(serde_json::json!({
"error": e.to_string(),
"phase": "serialization"
}))
})?;
let m = serde_json::from_value(json.clone()).map_err(|e| {
crate::Error::parse_error().data(serde_json::json!({
"error": e.to_string(),
"json": json,
"phase": "deserialization"
}))
})?;
Ok(m)
}
pub fn json_cast_params<N, M>(params: N) -> Result<M, crate::Error>
where
N: serde::Serialize,
M: serde::de::DeserializeOwned,
{
let json = serde_json::to_value(params).map_err(|e| {
crate::Error::internal_error().data(serde_json::json!({
"error": e.to_string(),
"phase": "serialization"
}))
})?;
let m = serde_json::from_value(json.clone()).map_err(|e| {
crate::Error::invalid_params().data(serde_json::json!({
"error": e.to_string(),
"json": json,
"phase": "deserialization"
}))
})?;
Ok(m)
}
pub fn internal_error(message: impl ToString) -> crate::Error {
crate::Error::internal_error().data(message.to_string())
}
pub fn parse_error(message: impl ToString) -> crate::Error {
crate::Error::parse_error().data(message.to_string())
}
pub(crate) fn id_to_json(id: &jsonrpcmsg::Id) -> serde_json::Value {
match id {
jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
jsonrpcmsg::Id::Null => serde_json::Value::Null,
}
}
pub(crate) fn instrumented_with_connection_name<F>(
name: String,
task: F,
) -> tracing::instrument::Instrumented<F> {
use tracing::Instrument;
task.instrument(tracing::info_span!("connection", name = name))
}
pub(crate) async fn instrument_with_connection_name<R>(
name: Option<String>,
task: impl Future<Output = R>,
) -> R {
if let Some(name) = name {
instrumented_with_connection_name(name.clone(), task).await
} else {
task.await
}
}
#[must_use]
pub fn into_jsonrpc_error(err: crate::Error) -> crate::jsonrpcmsg::Error {
crate::jsonrpcmsg::Error {
code: err.code.into(),
message: err.message,
data: err.data,
}
}
pub async fn both<E>(
a: impl Future<Output = Result<(), E>>,
b: impl Future<Output = Result<(), E>>,
) -> Result<(), E> {
let ((), ()) = futures::future::try_join(a, b).await?;
Ok(())
}
pub async fn run_until<T, E>(
background: impl Future<Output = Result<(), E>>,
foreground: impl Future<Output = Result<T, E>>,
) -> Result<T, E> {
use futures::future::{Either, select};
use std::pin::pin;
match select(pin!(background), pin!(foreground)).await {
Either::Left((bg_result, fg_future)) => {
bg_result?; fg_future.await
}
Either::Right((fg_result, _bg_future)) => {
fg_result
}
}
}
pub async fn process_stream_concurrently<T, F>(
stream: impl Stream<Item = T>,
process_fn: F,
process_fn_hack: impl for<'a> Fn(&'a F, T) -> BoxFuture<'a, Result<(), crate::Error>>,
) -> Result<(), crate::Error>
where
F: AsyncFn(T) -> Result<(), crate::Error>,
{
use std::pin::pin;
use futures::stream::{FusedStream, FuturesUnordered};
use futures_concurrency::future::Race;
enum Event<T> {
NewItem(Option<T>),
FutureCompleted(Option<Result<(), crate::Error>>),
}
let mut stream = pin!(stream.fuse());
let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
loop {
if futures.is_empty() {
match stream.next().await {
Some(item) => futures.push(process_fn_hack(&process_fn, item)),
None => return Ok(()),
}
continue;
}
if stream.is_terminated() {
while let Some(result) = futures.next().await {
result?;
}
return Ok(());
}
let event = (async { Event::NewItem(stream.next().await) }, async {
Event::FutureCompleted(futures.next().await)
})
.race()
.await;
match event {
Event::NewItem(Some(item)) => {
futures.push(process_fn_hack(&process_fn, item));
}
Event::FutureCompleted(Some(result)) => {
result?;
}
Event::NewItem(None) | Event::FutureCompleted(None) => {
}
}
}
}