wrpc_transport_quic/
lib.rs1use anyhow::Context as _;
4use bytes::Bytes;
5use quinn::{Connection, RecvStream, SendStream, VarInt};
6use tracing::{debug, error, trace, warn};
7use wrpc_transport::frame::{Accept, Incoming, InvokeBuilder, Outgoing};
8use wrpc_transport::Invoke;
9
10pub type Server = wrpc_transport::Server<(), RecvStream, SendStream, ConnHandler>;
12
13#[derive(Clone, Debug)]
15pub struct Client(Connection);
16
17pub struct ConnHandler;
19
20const DONE: VarInt = VarInt::from_u32(1);
21
22impl wrpc_transport::frame::ConnHandler<RecvStream, SendStream> for ConnHandler {
23 async fn on_ingress(mut rx: RecvStream, res: std::io::Result<()>) {
24 if let Err(err) = res {
25 error!(?err, "ingress failed");
26 } else {
27 debug!("ingress successfully complete");
28 }
29 if let Err(err) = rx.stop(DONE) {
30 debug!(?err, "failed to close incoming stream");
31 }
32 }
33
34 async fn on_egress(mut tx: SendStream, res: std::io::Result<()>) {
35 if let Err(err) = res {
36 error!(?err, "egress failed");
37 } else {
38 debug!("egress successfully complete");
39 }
40 match tx.stopped().await {
41 Ok(None) => {
42 trace!("stream successfully closed")
43 }
44 Ok(Some(code)) => {
45 if code == DONE {
46 trace!("stream successfully closed")
47 } else {
48 warn!(?code, "stream closed with code")
49 }
50 }
51 Err(err) => {
52 error!(?err, "failed to await stream close");
53 }
54 }
55 }
56}
57
58impl From<Connection> for Client {
59 fn from(conn: Connection) -> Self {
60 Self(conn)
61 }
62}
63
64impl Invoke for &Client {
65 type Context = ();
66 type Outgoing = Outgoing;
67 type Incoming = Incoming;
68
69 async fn invoke<P>(
70 &self,
71 (): Self::Context,
72 instance: &str,
73 func: &str,
74 params: Bytes,
75 paths: impl AsRef<[P]> + Send,
76 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
77 where
78 P: AsRef<[Option<usize>]> + Send + Sync,
79 {
80 let (tx, rx) = self
81 .0
82 .open_bi()
83 .await
84 .context("failed to open parameter stream")?;
85 InvokeBuilder::<ConnHandler>::default()
86 .invoke(tx, rx, instance, func, params, paths)
87 .await
88 }
89}
90
91impl Invoke for Client {
92 type Context = ();
93 type Outgoing = Outgoing;
94 type Incoming = Incoming;
95
96 async fn invoke<P>(
97 &self,
98 (): Self::Context,
99 instance: &str,
100 func: &str,
101 params: Bytes,
102 paths: impl AsRef<[P]> + Send,
103 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
104 where
105 P: AsRef<[Option<usize>]> + Send + Sync,
106 {
107 (&self).invoke((), instance, func, params, paths).await
108 }
109}
110
111impl Accept for &Client {
112 type Context = ();
113 type Outgoing = SendStream;
114 type Incoming = RecvStream;
115
116 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
117 let (tx, rx) = self.0.accept_bi().await?;
118 Ok(((), tx, rx))
119 }
120}
121
122impl Accept for Client {
123 type Context = ();
124 type Outgoing = SendStream;
125 type Incoming = RecvStream;
126
127 async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
128 (&self).accept().await
129 }
130}