use std::{sync::Arc, time::Duration};
use futures_util::StreamExt;
use jsonrpc_core::{MetaIoHandler, Params};
use jsonrpc_utils::{
axum_utils::jsonrpc_router,
pub_sub::{add_pub_sub, PublishMsg},
stream::StreamServerConfig,
};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
#[tokio::main]
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
let (tx, _) = broadcast::channel(8);
tokio::spawn({
let tx = tx.clone();
async move {
for i in 0u64.. {
drop(tx.send(PublishMsg::result(&i)));
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
move |_params: Params| {
Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
result.unwrap_or_else(|_| {
PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "lagged".into(),
data: None,
})
})
}))
},
);
let rpc = Arc::new(rpc);
let config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4)
.with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), config);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}