jetstream_wireformat/
wire_format_extensions.rs1use std::{
5 io::{self},
6 net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
7};
8
9use bytes::Bytes;
10
11use super::WireFormat;
12
13pub trait AsyncWireFormat: std::marker::Sized {
14 fn encode_async<W: AsyncWireFormat + Unpin + Send>(
15 self,
16 writer: &mut W,
17 ) -> impl std::future::Future<Output = io::Result<()>> + Send;
18 fn decode_async<R: AsyncWireFormat + Unpin + Send>(
19 reader: &mut R,
20 ) -> impl std::future::Future<Output = io::Result<Self>> + Send;
21}
22
23#[cfg(not(target_arch = "wasm32"))]
24pub mod tokio {
25 use std::{future::Future, io};
26
27 use tokio::io::{AsyncRead, AsyncWrite};
28
29 use crate::WireFormat;
30 pub trait AsyncWireFormatExt
32 where
33 Self: WireFormat + Send,
34 {
35 fn encode_async<W>(self, writer: W) -> impl Future<Output = io::Result<()>>
45 where
46 Self: Sync,
47 W: AsyncWrite + Unpin + Send,
48 {
49 let mut writer = tokio_util::io::SyncIoBridge::new(writer);
50 async { tokio::task::block_in_place(move || self.encode(&mut writer)) }
51 }
52
53 fn decode_async<R>(reader: R) -> impl Future<Output = io::Result<Self>> + Send
63 where
64 Self: Sync,
65 R: AsyncRead + Unpin + Send,
66 {
67 let mut reader = tokio_util::io::SyncIoBridge::new(reader);
68 async { tokio::task::block_in_place(move || Self::decode(&mut reader)) }
69 }
70 }
71 impl<T: WireFormat + Send> AsyncWireFormatExt for T {}
73}
74
75pub trait ConvertWireFormat: WireFormat {
77 fn to_bytes(&self) -> Bytes;
83
84 fn from_bytes(buf: &Bytes) -> Result<Self, std::io::Error>;
94
95 fn as_bytes(&self) -> Vec<u8> {
101 self.to_bytes().to_vec()
102 }
103}
104
105impl<T> ConvertWireFormat for T
108where
109 T: WireFormat,
110{
111 fn to_bytes(&self) -> Bytes {
114 let mut buf = vec![];
115 let res = self.encode(&mut buf);
116 if let Err(e) = res {
117 panic!("Failed to encode: {}", e);
118 }
119 Bytes::from(buf)
120 }
121
122 fn from_bytes(buf: &Bytes) -> Result<Self, std::io::Error> {
125 let buf = buf.to_vec();
126 T::decode(&mut buf.as_slice())
127 }
128}
129
130#[cfg(feature = "std")]
131impl WireFormat for Ipv4Addr {
132 fn byte_size(&self) -> u32 {
133 self.octets().len() as u32
134 }
135
136 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
137 writer.write_all(&self.octets())
138 }
139
140 fn decode<R: io::Read>(reader: &mut R) -> io::Result<Self> {
141 let mut buf = [0u8; 4];
142 reader.read_exact(&mut buf)?;
143 Ok(Ipv4Addr::from(buf))
144 }
145}
146
147#[cfg(feature = "std")]
148impl WireFormat for Ipv6Addr {
149 fn byte_size(&self) -> u32 {
150 self.octets().len() as u32
151 }
152
153 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
154 writer.write_all(&self.octets())
155 }
156
157 fn decode<R: io::Read>(reader: &mut R) -> io::Result<Self> {
158 let mut buf = [0u8; 16];
159 reader.read_exact(&mut buf)?;
160 Ok(Ipv6Addr::from(buf))
161 }
162}
163
164#[cfg(feature = "std")]
165impl WireFormat for SocketAddrV4 {
166 fn byte_size(&self) -> u32 {
167 self.ip().byte_size() + 2
168 }
169
170 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
171 self.ip().encode(writer)?;
172 self.port().encode(writer)
173 }
174
175 fn decode<R: io::Read>(reader: &mut R) -> io::Result<Self> {
176 self::Ipv4Addr::decode(reader)
177 .and_then(|ip| u16::decode(reader).map(|port| SocketAddrV4::new(ip, port)))
178 }
179}
180
181#[cfg(feature = "std")]
182impl WireFormat for SocketAddrV6 {
183 fn byte_size(&self) -> u32 {
184 self.ip().byte_size() + 2
185 }
186
187 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
188 self.ip().encode(writer)?;
189 self.port().encode(writer)
190 }
191
192 fn decode<R: io::Read>(reader: &mut R) -> io::Result<Self> {
193 self::Ipv6Addr::decode(reader)
194 .and_then(|ip| u16::decode(reader).map(|port| SocketAddrV6::new(ip, port, 0, 0)))
195 }
196}