use anyhow::Result;
use app::AppService;
use futures_lite::StreamExt;
use futures_util::SinkExt;
use quic_rpc::{client::BoxedConnector, transport::flume, Listener, RpcClient, RpcServer};
#[tokio::main]
async fn main() -> Result<()> {
let (server_conn, client_conn) = flume::channel(1);
let handler = app::Handler::default();
tokio::task::spawn(run_server(server_conn, handler));
client_demo(BoxedConnector::<AppService>::new(client_conn)).await?;
Ok(())
}
async fn run_server<C: Listener<AppService>>(server_conn: C, handler: app::Handler) {
let server = RpcServer::<AppService, _>::new(server_conn);
server
.accept_loop(move |req, chan| handler.clone().handle_rpc_request(req, chan))
.await
}
pub async fn client_demo(conn: BoxedConnector<AppService>) -> Result<()> {
let rpc_client = RpcClient::<AppService>::new(conn);
let client = app::Client::new(rpc_client.clone());
let res = client.app_version().await?;
println!("app_version: {res:?}");
let res = client.iroh.calc.add(40, 2).await?;
println!("iroh.calc.add: {res:?}");
let res = rpc_client
.clone()
.map::<iroh::IrohService>()
.map::<calc::CalcService>()
.rpc(calc::AddRequest(19, 4))
.await?;
println!("iroh.calc.add (raw): {res:?}");
let (mut sink, res) = rpc_client
.map::<iroh::IrohService>()
.map::<calc::CalcService>()
.client_streaming(calc::SumRequest)
.await?;
sink.send(calc::SumUpdate(4)).await.unwrap();
sink.send(calc::SumUpdate(8)).await.unwrap();
sink.send(calc::SumUpdate(30)).await.unwrap();
drop(sink);
let res = res.await?;
println!("iroh.calc.sum (raw): {res:?}");
let mut stream = client.iroh.clock.tick().await?;
while let Some(tick) = stream.try_next().await? {
println!("iroh.clock.tick: {tick}");
}
Ok(())
}
mod app {
use anyhow::Result;
use derive_more::{From, TryInto};
use quic_rpc::{message::RpcMsg, server::RpcChannel, Listener, RpcClient, Service};
use serde::{Deserialize, Serialize};
use super::iroh;
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Request {
Iroh(iroh::Request),
AppVersion(AppVersionRequest),
}
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Response {
Iroh(iroh::Response),
AppVersion(AppVersionResponse),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AppVersionRequest;
impl RpcMsg<AppService> for AppVersionRequest {
type Response = AppVersionResponse;
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AppVersionResponse(pub String);
#[derive(Copy, Clone, Debug)]
pub struct AppService;
impl Service for AppService {
type Req = Request;
type Res = Response;
}
#[derive(Clone)]
pub struct Handler {
iroh: iroh::Handler,
app_version: String,
}
impl Default for Handler {
fn default() -> Self {
Self {
iroh: iroh::Handler::default(),
app_version: "v0.1-alpha".to_string(),
}
}
}
impl Handler {
pub async fn handle_rpc_request<E: Listener<AppService>>(
self,
req: Request,
chan: RpcChannel<AppService, E>,
) -> Result<()> {
match req {
Request::Iroh(req) => {
self.iroh
.handle_rpc_request(req, chan.map().boxed())
.await?
}
Request::AppVersion(req) => chan.rpc(req, self, Self::on_version).await?,
};
Ok(())
}
pub async fn on_version(self, _req: AppVersionRequest) -> AppVersionResponse {
AppVersionResponse(self.app_version.clone())
}
}
#[derive(Debug, Clone)]
pub struct Client {
pub iroh: iroh::Client,
client: RpcClient<AppService>,
}
impl Client {
pub fn new(client: RpcClient<AppService>) -> Self {
Self {
client: client.clone(),
iroh: iroh::Client::new(client.map().boxed()),
}
}
pub async fn app_version(&self) -> Result<String> {
let res = self.client.rpc(AppVersionRequest).await?;
Ok(res.0)
}
}
}
mod iroh {
use anyhow::Result;
use derive_more::{From, TryInto};
use quic_rpc::{server::RpcChannel, RpcClient, Service};
use serde::{Deserialize, Serialize};
use super::{calc, clock};
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Request {
Calc(calc::Request),
Clock(clock::Request),
}
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Response {
Calc(calc::Response),
Clock(clock::Response),
}
#[derive(Copy, Clone, Debug)]
pub struct IrohService;
impl Service for IrohService {
type Req = Request;
type Res = Response;
}
#[derive(Clone, Default)]
pub struct Handler {
calc: calc::Handler,
clock: clock::Handler,
}
impl Handler {
pub async fn handle_rpc_request(
self,
req: Request,
chan: RpcChannel<IrohService>,
) -> Result<()> {
match req {
Request::Calc(req) => {
self.calc
.handle_rpc_request(req, chan.map().boxed())
.await?
}
Request::Clock(req) => {
self.clock
.handle_rpc_request(req, chan.map().boxed())
.await?
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Client {
pub calc: calc::Client,
pub clock: clock::Client,
}
impl Client {
pub fn new(client: RpcClient<IrohService>) -> Self {
Self {
calc: calc::Client::new(client.clone().map().boxed()),
clock: clock::Client::new(client.clone().map().boxed()),
}
}
}
}
mod calc {
use std::fmt::Debug;
use anyhow::{bail, Result};
use derive_more::{From, TryInto};
use futures_lite::{Stream, StreamExt};
use quic_rpc::{
message::{ClientStreaming, ClientStreamingMsg, Msg, RpcMsg},
server::RpcChannel,
RpcClient, Service,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct AddRequest(pub i64, pub i64);
impl RpcMsg<CalcService> for AddRequest {
type Response = AddResponse;
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AddResponse(pub i64);
#[derive(Debug, Serialize, Deserialize)]
pub struct SumRequest;
#[derive(Debug, Serialize, Deserialize)]
pub struct SumUpdate(pub i64);
impl Msg<CalcService> for SumRequest {
type Pattern = ClientStreaming;
}
impl ClientStreamingMsg<CalcService> for SumRequest {
type Update = SumUpdate;
type Response = SumResponse;
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SumResponse(pub i64);
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Request {
Add(AddRequest),
Sum(SumRequest),
SumUpdate(SumUpdate),
}
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Response {
Add(AddResponse),
Sum(SumResponse),
}
#[derive(Copy, Clone, Debug)]
pub struct CalcService;
impl Service for CalcService {
type Req = Request;
type Res = Response;
}
#[derive(Clone, Default)]
pub struct Handler;
impl Handler {
pub async fn handle_rpc_request(
self,
req: Request,
chan: RpcChannel<CalcService>,
) -> Result<()> {
match req {
Request::Add(req) => chan.rpc(req, self, Self::on_add).await?,
Request::Sum(req) => chan.client_streaming(req, self, Self::on_sum).await?,
Request::SumUpdate(_) => bail!("Unexpected update message at start of request"),
}
Ok(())
}
pub async fn on_add(self, req: AddRequest) -> AddResponse {
AddResponse(req.0 + req.1)
}
pub async fn on_sum(
self,
_req: SumRequest,
updates: impl Stream<Item = SumUpdate>,
) -> SumResponse {
let mut sum = 0i64;
tokio::pin!(updates);
while let Some(SumUpdate(n)) = updates.next().await {
sum += n;
}
SumResponse(sum)
}
}
#[derive(Debug, Clone)]
pub struct Client {
client: RpcClient<CalcService>,
}
impl Client {
pub fn new(client: RpcClient<CalcService>) -> Self {
Self { client }
}
pub async fn add(&self, a: i64, b: i64) -> anyhow::Result<i64> {
let res = self.client.rpc(AddRequest(a, b)).await?;
Ok(res.0)
}
}
}
mod clock {
use std::{
fmt::Debug,
sync::{Arc, RwLock},
time::Duration,
};
use anyhow::Result;
use derive_more::{From, TryInto};
use futures_lite::{stream::Boxed as BoxStream, Stream, StreamExt};
use futures_util::TryStreamExt;
use quic_rpc::{
message::{Msg, ServerStreaming, ServerStreamingMsg},
server::RpcChannel,
RpcClient, Service,
};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Debug, Serialize, Deserialize)]
pub struct TickRequest;
impl Msg<ClockService> for TickRequest {
type Pattern = ServerStreaming;
}
impl ServerStreamingMsg<ClockService> for TickRequest {
type Response = TickResponse;
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TickResponse {
tick: usize,
}
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Request {
Tick(TickRequest),
}
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Response {
Tick(TickResponse),
}
#[derive(Copy, Clone, Debug)]
pub struct ClockService;
impl Service for ClockService {
type Req = Request;
type Res = Response;
}
#[derive(Clone)]
pub struct Handler {
tick: Arc<RwLock<usize>>,
ontick: Arc<Notify>,
}
impl Default for Handler {
fn default() -> Self {
Self::new(Duration::from_secs(1))
}
}
impl Handler {
pub fn new(tick_duration: Duration) -> Self {
let h = Handler {
tick: Default::default(),
ontick: Default::default(),
};
let h2 = h.clone();
tokio::task::spawn(async move {
loop {
tokio::time::sleep(tick_duration).await;
*h2.tick.write().unwrap() += 1;
h2.ontick.notify_waiters();
}
});
h
}
pub async fn handle_rpc_request(
self,
req: Request,
chan: RpcChannel<ClockService>,
) -> Result<()> {
match req {
Request::Tick(req) => chan.server_streaming(req, self, Self::on_tick).await?,
}
Ok(())
}
pub fn on_tick(
self,
req: TickRequest,
) -> impl Stream<Item = TickResponse> + Send + 'static {
let (tx, rx) = flume::bounded(2);
tokio::task::spawn(async move {
if let Err(err) = self.on_tick0(req, tx).await {
tracing::warn!(?err, "on_tick RPC handler failed");
}
});
rx.into_stream()
}
pub async fn on_tick0(
self,
_req: TickRequest,
tx: flume::Sender<TickResponse>,
) -> Result<()> {
loop {
let tick = *self.tick.read().unwrap();
tx.send_async(TickResponse { tick }).await?;
self.ontick.notified().await;
}
}
}
#[derive(Debug, Clone)]
pub struct Client {
client: RpcClient<ClockService>,
}
impl Client {
pub fn new(client: RpcClient<ClockService>) -> Self {
Self { client }
}
pub async fn tick(&self) -> Result<BoxStream<Result<usize>>> {
let res = self.client.server_streaming(TickRequest).await?;
Ok(res.map_ok(|r| r.tick).map_err(anyhow::Error::from).boxed())
}
}
}