1use std::{
4 borrow::{Borrow, BorrowMut},
5 fmt,
6 future::Future,
7};
8
9use bytes::{Buf, BufMut, BytesMut};
10use monoio::{
11 buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut, SliceMut},
12 io::{sink::Sink, stream::Stream, AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt},
13 BufResult,
14};
15
16use crate::{Decoded, Decoder, Encoder};
17
18const INITIAL_CAPACITY: usize = 8 * 1024;
19const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
20const RESERVE: usize = 4096;
21
22pub struct FramedInner<IO, Codec, S> {
23 io: IO,
24 codec: Codec,
25 state: S,
26}
27
28#[derive(Debug)]
29pub struct ReadState {
30 state: State,
31 buffer: BytesMut,
32}
33
34impl ReadState {
35 #[inline]
36 fn with_capacity(capacity: usize) -> Self {
37 Self {
38 state: State::Framing(None),
39 buffer: BytesMut::with_capacity(capacity),
40 }
41 }
42}
43
44impl Default for ReadState {
45 #[inline]
46 fn default() -> Self {
47 Self::with_capacity(INITIAL_CAPACITY)
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52enum State {
53 Framing(Option<usize>),
54 Pausing,
55 Paused,
56 Errored,
57}
58
59#[derive(Debug)]
60pub struct WriteState {
61 buffer: BytesMut,
62}
63
64impl Default for WriteState {
65 #[inline]
66 fn default() -> Self {
67 Self {
68 buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
69 }
70 }
71}
72
73#[derive(Debug, Default)]
74pub struct RWState {
75 read: ReadState,
76 write: WriteState,
77}
78
79impl Borrow<ReadState> for RWState {
80 #[inline]
81 fn borrow(&self) -> &ReadState {
82 &self.read
83 }
84}
85impl BorrowMut<ReadState> for RWState {
86 #[inline]
87 fn borrow_mut(&mut self) -> &mut ReadState {
88 &mut self.read
89 }
90}
91impl Borrow<WriteState> for RWState {
92 #[inline]
93 fn borrow(&self) -> &WriteState {
94 &self.write
95 }
96}
97impl BorrowMut<WriteState> for RWState {
98 #[inline]
99 fn borrow_mut(&mut self) -> &mut WriteState {
100 &mut self.write
101 }
102}
103
104impl<IO, Codec, S> FramedInner<IO, Codec, S> {
105 #[inline]
106 const fn new(io: IO, codec: Codec, state: S) -> Self {
107 Self { io, codec, state }
108 }
109
110 async fn peek_data<'a, 'b>(io: &'b mut IO, state: &'a mut S) -> std::io::Result<&'a mut [u8]>
111 where
112 IO: AsyncReadRent,
113 S: BorrowMut<ReadState>,
114 {
115 let read_state: &mut ReadState = state.borrow_mut();
116 let state = &mut read_state.state;
117 let buffer = &mut read_state.buffer;
118
119 if !buffer.is_empty() {
120 return Ok(buffer.as_mut());
121 }
122 buffer.reserve(RESERVE);
123
124 macro_rules! ok {
125 ($result: expr, $state: expr) => {
126 match $result {
127 Ok(x) => x,
128 Err(e) => {
129 *$state = State::Errored;
130 return Err(e);
131 }
132 }
133 };
134 }
135
136 let end = buffer.capacity();
138 let owned_buf = std::mem::take(buffer);
139 let owned_slice = unsafe { SliceMut::new_unchecked(owned_buf, 0, end) };
140 let (result, owned_slice) = io.read(owned_slice).await;
141 *buffer = owned_slice.into_inner();
142 let n = ok!(result, state);
143 if n == 0 {
144 *state = State::Paused;
145 }
146 Ok(buffer.as_mut())
147 }
148
149 async fn next_with(
155 io: &mut IO,
156 codec: &mut Codec,
157 state: &mut S,
158 ) -> Option<Result<Codec::Item, Codec::Error>>
159 where
160 IO: AsyncReadRent,
161 Codec: Decoder,
162 S: BorrowMut<ReadState>,
163 {
164 macro_rules! ok {
165 ($result: expr, $state: expr) => {
166 match $result {
167 Ok(x) => x,
168 Err(e) => {
169 *$state = State::Errored;
170 return Some(Err(e.into()));
171 }
172 }
173 };
174 }
175
176 let read_state: &mut ReadState = state.borrow_mut();
177 let state = &mut read_state.state;
178 let buffer = &mut read_state.buffer;
179
180 loop {
181 match state {
182 State::Framing(hint) => loop {
186 if !matches!(hint, Some(size) if buffer.len() < *size) && !buffer.is_empty() {
187 *hint = match ok!(codec.decode(buffer), state) {
190 Decoded::Some(item) => {
191 *hint = None;
193 return Some(Ok(item));
194 }
195 Decoded::Insufficient => None,
196 Decoded::InsufficientAtLeast(size) => Some(size),
197 };
198 }
199
200 let reserve = match *hint {
201 Some(size) if size > buffer.len() => RESERVE.max(size - buffer.len()),
202 _ => RESERVE,
203 };
204 buffer.reserve(reserve);
205 let (begin, end) = {
206 let buffer_ptr = buffer.write_ptr();
207 let slice_to_write = buffer.chunk_mut();
208 let begin =
209 unsafe { slice_to_write.as_mut_ptr().offset_from(buffer_ptr) } as usize;
210 let end = begin + slice_to_write.len();
211 (begin, end)
212 };
213 let owned_buf = std::mem::take(buffer);
214 let owned_slice = unsafe { SliceMut::new_unchecked(owned_buf, begin, end) };
215 let (result, owned_slice) = io.read(owned_slice).await;
216 *buffer = owned_slice.into_inner();
217 let n = ok!(result, state);
218 if n == 0 {
219 *state = State::Pausing;
220 break;
221 }
222 },
223 State::Pausing => {
225 return match ok!(codec.decode_eof(buffer), state) {
226 Decoded::Some(item) => Some(Ok(item)),
227 _ => {
228 *state = State::Paused;
230 None
231 }
232 };
233 }
234 State::Paused => {
236 buffer.reserve(RESERVE);
237 let (begin, end) = {
238 let buffer_ptr = buffer.write_ptr();
239 let slice_to_write = buffer.chunk_mut();
240 let begin =
241 unsafe { slice_to_write.as_mut_ptr().offset_from(buffer_ptr) } as usize;
242 let end = begin + slice_to_write.len();
243 (begin, end)
244 };
245 let owned_buf = std::mem::take(buffer);
246 let owned_slice = unsafe { SliceMut::new_unchecked(owned_buf, begin, end) };
247 let (result, owned_slice) = io.read(owned_slice).await;
248 *buffer = owned_slice.into_inner();
249 let n = ok!(result, state);
250 if n == 0 {
251 return None;
253 }
254 *state = State::Framing(None);
256 }
257 State::Errored => {
259 *state = State::Paused;
260 return None;
261 }
262 }
263 }
264 }
265
266 async fn flush(io: &mut IO, state: &mut S) -> std::io::Result<()>
267 where
268 IO: AsyncWriteRent,
269 S: BorrowMut<WriteState>,
270 {
271 let WriteState { buffer } = state.borrow_mut();
272 if buffer.is_empty() {
273 return Ok(());
274 }
275 let buf = std::mem::take(buffer);
277 let (result, buf) = io.write_all(buf).await;
278 *buffer = buf;
279 result?;
280 buffer.clear();
281 io.flush().await?;
282 Ok(())
283 }
284
285 #[inline]
286 async fn send_with<Item>(
287 io: &mut IO,
288 codec: &mut Codec,
289 state: &mut S,
290 item: Item,
291 ) -> Result<(), Codec::Error>
292 where
293 IO: AsyncWriteRent,
294 Codec: Encoder<Item>,
295 S: BorrowMut<WriteState>,
296 {
297 if state.borrow_mut().buffer.len() >= BACKPRESSURE_BOUNDARY {
298 Self::flush(io, state).await?;
299 }
300 codec.encode(item, &mut state.borrow_mut().buffer)?;
301 Ok(())
302 }
303}
304
305impl<IO, Codec, S> AsyncReadRent for FramedInner<IO, Codec, S>
306where
307 IO: AsyncReadRent,
308 S: BorrowMut<ReadState>,
309{
310 async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
311 let read_state: &mut ReadState = self.state.borrow_mut();
312 let state = &mut read_state.state;
313 let buffer = &mut read_state.buffer;
314
315 if buf.bytes_total() == 0 {
316 return (Ok(0), buf);
317 }
318
319 let to_copy = buf.bytes_total().min(buffer.len());
321 if to_copy != 0 {
322 unsafe {
323 buf.write_ptr()
324 .copy_from_nonoverlapping(buffer.as_ptr(), to_copy);
325 buf.set_init(to_copy);
326 }
327 buffer.advance(to_copy);
328 return (Ok(to_copy), buf);
329 }
330
331 if buf.bytes_total() > INITIAL_CAPACITY {
333 let (res, buf) = self.io.read(buf).await;
334 return match res {
335 Ok(0) => {
336 *state = State::Pausing;
337 (Ok(0), buf)
338 }
339 Ok(n) => (Ok(n), buf),
340 Err(e) => {
341 *state = State::Errored;
342 (Err(e), buf)
343 }
344 };
345 }
346 buffer.reserve(INITIAL_CAPACITY);
348 let owned_buffer = std::mem::take(buffer);
349 let (res, owned_buffer) = self.io.read(owned_buffer).await;
350 *buffer = owned_buffer;
351 match res {
352 Ok(0) => {
353 *state = State::Pausing;
354 return (Ok(0), buf);
355 }
356 Err(e) => {
357 *state = State::Errored;
358 return (Err(e), buf);
359 }
360 _ => (),
361 }
362 let to_copy = buf.bytes_total().min(buffer.len());
363 unsafe {
364 buf.write_ptr()
365 .copy_from_nonoverlapping(buffer.as_ptr(), to_copy);
366 buf.set_init(to_copy);
367 }
368 buffer.advance(to_copy);
369 (Ok(to_copy), buf)
370 }
371
372 async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
373 let slice = match IoVecWrapperMut::new(buf) {
374 Ok(slice) => slice,
375 Err(buf) => return (Ok(0), buf),
376 };
377
378 let (result, slice) = self.read(slice).await;
379 buf = slice.into_inner();
380 if let Ok(n) = result {
381 unsafe { buf.set_init(n) };
382 }
383 (result, buf)
384 }
385}
386
387impl<IO, Codec, S> Stream for FramedInner<IO, Codec, S>
388where
389 IO: AsyncReadRent,
390 Codec: Decoder,
391 S: BorrowMut<ReadState>,
392{
393 type Item = Result<Codec::Item, Codec::Error>;
394
395 #[inline]
396 async fn next(&mut self) -> Option<Self::Item> {
397 Self::next_with(&mut self.io, &mut self.codec, &mut self.state).await
398 }
399}
400
401impl<IO, Codec, S> AsyncWriteRent for FramedInner<IO, Codec, S>
402where
403 IO: AsyncWriteRent,
404 S: BorrowMut<WriteState>,
405{
406 async fn write<T: monoio::buf::IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
407 let WriteState { buffer } = self.state.borrow_mut();
408 if buffer.len() >= BACKPRESSURE_BOUNDARY || buf.bytes_init() >= INITIAL_CAPACITY {
409 if let Err(e) = AsyncWriteRent::flush(self).await {
411 return (Err(e), buf);
412 }
413 return self.io.write_all(buf).await;
415 }
416 let cap = buffer.capacity() - buffer.len();
418 let size = buf.bytes_init().min(cap);
419 let slice = unsafe { std::slice::from_raw_parts(buf.read_ptr(), size) };
420 buffer.extend_from_slice(slice);
421 (Ok(size), buf)
422 }
423
424 #[inline]
425 async fn writev<T: monoio::buf::IoVecBuf>(&mut self, buf: T) -> BufResult<usize, T> {
426 let slice = match monoio::buf::IoVecWrapper::new(buf) {
427 Ok(slice) => slice,
428 Err(buf) => return (Ok(0), buf),
429 };
430
431 let (result, slice) = self.write(slice).await;
432 (result, slice.into_inner())
433 }
434
435 #[inline]
436 async fn flush(&mut self) -> std::io::Result<()> {
437 FramedInner::<_, Codec, _>::flush(&mut self.io, &mut self.state).await
438 }
439
440 #[inline]
441 async fn shutdown(&mut self) -> std::io::Result<()> {
442 AsyncWriteRent::flush(self).await?;
443 self.io.shutdown().await?;
444 Ok(())
445 }
446}
447
448impl<IO, Codec, S, Item> Sink<Item> for FramedInner<IO, Codec, S>
449where
450 IO: AsyncWriteRent,
451 Codec: Encoder<Item>,
452 S: BorrowMut<WriteState>,
453{
454 type Error = Codec::Error;
455
456 #[inline]
457 async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
458 if self.state.borrow_mut().buffer.len() >= BACKPRESSURE_BOUNDARY {
459 FramedInner::<_, Codec, _>::flush(&mut self.io, &mut self.state).await?;
460 }
461 self.codec
462 .encode(item, &mut self.state.borrow_mut().buffer)?;
463 Ok(())
464 }
465
466 #[inline]
467 async fn flush(&mut self) -> Result<(), Self::Error> {
468 AsyncWriteRent::flush(self).await?;
469 Ok(())
470 }
471
472 #[inline]
473 async fn close(&mut self) -> Result<(), Self::Error> {
474 AsyncWriteRent::shutdown(self).await?;
475 Ok(())
476 }
477}
478
479pub struct Framed<IO, Codec> {
480 inner: FramedInner<IO, Codec, RWState>,
481}
482
483pub struct FramedRead<IO, Codec> {
484 inner: FramedInner<IO, Codec, ReadState>,
485}
486
487pub struct FramedWrite<IO, Codec> {
488 inner: FramedInner<IO, Codec, WriteState>,
489}
490
491impl<IO, Codec> Framed<IO, Codec> {
492 #[inline]
493 pub fn new(io: IO, codec: Codec) -> Self {
494 Self {
495 inner: FramedInner::new(io, codec, RWState::default()),
496 }
497 }
498
499 #[inline]
500 pub fn with_capacity(io: IO, codec: Codec, capacity: usize) -> Self {
501 Self {
502 inner: FramedInner::new(
503 io,
504 codec,
505 RWState {
506 read: ReadState::with_capacity(capacity),
507 write: Default::default(),
508 },
509 ),
510 }
511 }
512
513 #[inline]
520 pub fn get_ref(&self) -> &IO {
521 &self.inner.io
522 }
523
524 #[inline]
531 pub fn get_mut(&mut self) -> &mut IO {
532 &mut self.inner.io
533 }
534
535 #[inline]
541 pub fn codec(&self) -> &Codec {
542 &self.inner.codec
543 }
544
545 #[inline]
551 pub fn codec_mut(&mut self) -> &mut Codec {
552 &mut self.inner.codec
553 }
554
555 #[inline]
561 pub fn map_codec<CodecNew, F>(self, map: F) -> Framed<IO, CodecNew>
562 where
563 F: FnOnce(Codec) -> CodecNew,
564 {
565 let FramedInner { io, codec, state } = self.inner;
566 Framed {
567 inner: FramedInner {
568 io,
569 codec: map(codec),
570 state,
571 },
572 }
573 }
574
575 #[inline]
577 pub fn read_buffer(&self) -> &BytesMut {
578 &self.inner.state.read.buffer
579 }
580
581 #[inline]
583 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
584 &mut self.inner.state.read.buffer
585 }
586
587 #[inline]
589 pub fn read_state_mut(&mut self) -> (&mut IO, &mut BytesMut) {
590 (&mut self.inner.io, &mut self.inner.state.read.buffer)
591 }
592
593 #[inline]
595 pub fn write_buffer(&self) -> &BytesMut {
596 &self.inner.state.write.buffer
597 }
598
599 #[inline]
601 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
602 &mut self.inner.state.write.buffer
603 }
604
605 #[inline]
611 pub fn into_inner(self) -> IO {
612 self.inner.io
613 }
614
615 #[inline]
617 pub async fn next_with<C: Decoder>(
618 &mut self,
619 codec: &mut C,
620 ) -> Option<Result<C::Item, C::Error>>
621 where
622 IO: AsyncReadRent,
623 {
624 FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
625 }
626
627 pub fn peek_data(&mut self) -> impl Future<Output = std::io::Result<&mut [u8]>>
630 where
631 IO: AsyncReadRent,
632 {
633 FramedInner::<_, Codec, _>::peek_data(&mut self.inner.io, &mut self.inner.state)
634 }
635
636 #[inline]
638 pub async fn send_with<C: Encoder<Item>, Item>(
639 &mut self,
640 codec: &mut C,
641 item: Item,
642 ) -> Result<(), C::Error>
643 where
644 IO: AsyncWriteRent,
645 C: Encoder<Item>,
646 {
647 FramedInner::send_with(&mut self.inner.io, codec, &mut self.inner.state, item).await
648 }
649}
650
651impl<T, U> fmt::Debug for Framed<T, U>
652where
653 T: fmt::Debug,
654 U: fmt::Debug,
655{
656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657 f.debug_struct("Framed")
658 .field("io", self.get_ref())
659 .field("codec", self.codec())
660 .finish()
661 }
662}
663
664impl<IO, Codec> FramedRead<IO, Codec> {
665 pub fn new(io: IO, decoder: Codec) -> Self {
666 Self {
667 inner: FramedInner::new(io, decoder, ReadState::default()),
668 }
669 }
670
671 pub fn with_capacity(io: IO, codec: Codec, capacity: usize) -> Self {
672 Self {
673 inner: FramedInner::new(io, codec, ReadState::with_capacity(capacity)),
674 }
675 }
676
677 pub fn get_ref(&self) -> &IO {
684 &self.inner.io
685 }
686
687 pub fn get_mut(&mut self) -> &mut IO {
694 &mut self.inner.io
695 }
696
697 pub fn into_inner(self) -> IO {
703 self.inner.io
704 }
705
706 pub fn decoder(&self) -> &Codec {
708 &self.inner.codec
709 }
710
711 pub fn decoder_mut(&mut self) -> &mut Codec {
713 &mut self.inner.codec
714 }
715
716 pub fn map_decoder<CodecNew, F>(self, map: F) -> FramedRead<IO, CodecNew>
719 where
720 F: FnOnce(Codec) -> CodecNew,
721 {
722 let FramedInner { io, codec, state } = self.inner;
723 FramedRead {
724 inner: FramedInner {
725 io,
726 codec: map(codec),
727 state,
728 },
729 }
730 }
731
732 pub fn read_buffer(&self) -> &BytesMut {
734 &self.inner.state.buffer
735 }
736
737 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
739 &mut self.inner.state.buffer
740 }
741
742 pub fn read_state_mut(&mut self) -> (&mut IO, &mut BytesMut) {
744 (&mut self.inner.io, &mut self.inner.state.buffer)
745 }
746
747 pub async fn next_with<C: Decoder>(
749 &mut self,
750 codec: &mut C,
751 ) -> Option<Result<C::Item, C::Error>>
752 where
753 IO: AsyncReadRent,
754 {
755 FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
756 }
757
758 pub fn peek_data(&mut self) -> impl Future<Output = std::io::Result<&mut [u8]>>
761 where
762 IO: AsyncReadRent,
763 {
764 FramedInner::<_, Codec, _>::peek_data(&mut self.inner.io, &mut self.inner.state)
765 }
766}
767
768impl<T, D> fmt::Debug for FramedRead<T, D>
769where
770 T: fmt::Debug,
771 D: fmt::Debug,
772{
773 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774 f.debug_struct("FramedRead")
775 .field("inner", &self.get_ref())
776 .field("decoder", &self.decoder())
777 .field("state", &self.inner.state.state)
778 .field("buffer", &self.read_buffer())
779 .finish()
780 }
781}
782
783impl<IO, Codec> FramedWrite<IO, Codec> {
784 pub fn new(io: IO, encoder: Codec) -> Self {
785 Self {
786 inner: FramedInner::new(io, encoder, WriteState::default()),
787 }
788 }
789
790 pub fn get_ref(&self) -> &IO {
797 &self.inner.io
798 }
799
800 pub fn get_mut(&mut self) -> &mut IO {
807 &mut self.inner.io
808 }
809
810 pub fn into_inner(self) -> IO {
816 self.inner.io
817 }
818
819 pub fn encoder(&self) -> &Codec {
821 &self.inner.codec
822 }
823
824 pub fn encoder_mut(&mut self) -> &mut Codec {
826 &mut self.inner.codec
827 }
828
829 pub fn map_encoder<CodecNew, F>(self, map: F) -> FramedWrite<IO, CodecNew>
832 where
833 F: FnOnce(Codec) -> CodecNew,
834 {
835 let FramedInner { io, codec, state } = self.inner;
836 FramedWrite {
837 inner: FramedInner {
838 io,
839 codec: map(codec),
840 state,
841 },
842 }
843 }
844
845 pub fn write_buffer(&self) -> &BytesMut {
847 &self.inner.state.buffer
848 }
849
850 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
852 &mut self.inner.state.buffer
853 }
854}
855
856impl<T, U> fmt::Debug for FramedWrite<T, U>
857where
858 T: fmt::Debug,
859 U: fmt::Debug,
860{
861 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
862 f.debug_struct("FramedWrite")
863 .field("inner", &self.get_ref())
864 .field("encoder", &self.encoder())
865 .field("buffer", &self.inner.state.buffer)
866 .finish()
867 }
868}
869
870impl<IO, Codec> Stream for Framed<IO, Codec>
871where
872 IO: AsyncReadRent,
873 Codec: Decoder,
874{
875 type Item = <FramedInner<IO, Codec, RWState> as Stream>::Item;
876
877 #[inline]
878 async fn next(&mut self) -> Option<Self::Item> {
879 self.inner.next().await
880 }
881}
882
883impl<IO, Codec> Stream for FramedRead<IO, Codec>
884where
885 IO: AsyncReadRent,
886 Codec: Decoder,
887{
888 type Item = <FramedInner<IO, Codec, ReadState> as Stream>::Item;
889
890 #[inline]
891 async fn next(&mut self) -> Option<Self::Item> {
892 self.inner.next().await
893 }
894}
895
896impl<IO, Codec, Item> Sink<Item> for Framed<IO, Codec>
897where
898 IO: AsyncWriteRent,
899 Codec: Encoder<Item>,
900{
901 type Error = <FramedInner<IO, Codec, RWState> as Sink<Item>>::Error;
902
903 #[inline]
904 async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
905 self.inner.send(item).await
906 }
907
908 #[inline]
909 async fn flush(&mut self) -> Result<(), Self::Error> {
910 Sink::flush(&mut self.inner).await
911 }
912
913 #[inline]
914 async fn close(&mut self) -> Result<(), Self::Error> {
915 self.inner.close().await
916 }
917}
918
919impl<IO, Codec, Item> Sink<Item> for FramedWrite<IO, Codec>
920where
921 IO: AsyncWriteRent,
922 Codec: Encoder<Item>,
923{
924 type Error = <FramedInner<IO, Codec, WriteState> as Sink<Item>>::Error;
925
926 #[inline]
927 async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
928 self.inner.send(item).await
929 }
930
931 #[inline]
932 async fn flush(&mut self) -> Result<(), Self::Error> {
933 Sink::flush(&mut self.inner).await
934 }
935
936 #[inline]
937 async fn close(&mut self) -> Result<(), Self::Error> {
938 self.inner.close().await
939 }
940}
941
942pub trait StreamWithCodec<T> {
943 type Item;
944
945 fn next_with<'a>(&'a mut self, codec: &'a mut T) -> impl Future<Output = Option<Self::Item>>;
946}
947
948pub trait SinkWithCodec<T, Item>
949where
950 T: Encoder<Item>,
951{
952 fn send_with<'a>(
953 &'a mut self,
954 codec: &'a mut T,
955 item: Item,
956 ) -> impl Future<Output = Result<(), T::Error>>;
957
958 fn flush(&mut self) -> impl Future<Output = Result<(), T::Error>>;
959}
960
961impl<Codec: Decoder, IO: AsyncReadRent, AnyCodec> StreamWithCodec<Codec>
962 for FramedRead<IO, AnyCodec>
963{
964 type Item = Result<Codec::Item, Codec::Error>;
965
966 #[inline]
967 async fn next_with<'a>(&'a mut self, codec: &'a mut Codec) -> Option<Self::Item> {
968 FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
969 }
970}
971
972impl<Codec: Decoder, IO: AsyncReadRent, AnyCodec> StreamWithCodec<Codec> for Framed<IO, AnyCodec> {
973 type Item = Result<Codec::Item, Codec::Error>;
974
975 #[inline]
976 async fn next_with<'a>(&'a mut self, codec: &'a mut Codec) -> Option<Self::Item> {
977 FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
978 }
979}
980
981impl<Codec: Encoder<Item>, IO: AsyncWriteRent, AnyCodec, Item> SinkWithCodec<Codec, Item>
982 for FramedWrite<IO, AnyCodec>
983{
984 #[inline]
985 async fn send_with<'a>(
986 &'a mut self,
987 codec: &'a mut Codec,
988 item: Item,
989 ) -> Result<(), Codec::Error> {
990 FramedInner::send_with(&mut self.inner.io, codec, &mut self.inner.state, item).await
991 }
992
993 #[inline]
994 async fn flush(&mut self) -> Result<(), Codec::Error> {
995 FramedInner::<_, (), _>::flush(&mut self.inner.io, &mut self.inner.state)
996 .await
997 .map_err(|e| e.into())
998 }
999}
1000
1001impl<Codec: Encoder<Item>, IO: AsyncWriteRent, AnyCodec, Item> SinkWithCodec<Codec, Item>
1002 for Framed<IO, AnyCodec>
1003{
1004 #[inline]
1005 async fn send_with<'a>(
1006 &'a mut self,
1007 codec: &'a mut Codec,
1008 item: Item,
1009 ) -> Result<(), Codec::Error> {
1010 FramedInner::send_with(&mut self.inner.io, codec, &mut self.inner.state, item).await
1011 }
1012
1013 #[inline]
1014 async fn flush(&mut self) -> Result<(), Codec::Error> {
1015 FramedInner::<_, (), _>::flush(&mut self.inner.io, &mut self.inner.state)
1016 .await
1017 .map_err(|e| e.into())
1018 }
1019}
1020
1021impl<IO: AsyncReadRent, Codec> AsyncReadRent for Framed<IO, Codec> {
1022 #[inline]
1023 async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1024 self.inner.read(buf).await
1025 }
1026
1027 #[inline]
1028 async fn readv<T: IoVecBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1029 self.inner.readv(buf).await
1030 }
1031}
1032
1033impl<IO: AsyncReadRent, Codec> AsyncReadRent for FramedRead<IO, Codec> {
1034 #[inline]
1035 async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1036 self.inner.read(buf).await
1037 }
1038
1039 #[inline]
1040 async fn readv<T: IoVecBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1041 self.inner.readv(buf).await
1042 }
1043}
1044
1045impl<IO: AsyncWriteRent, Codec> AsyncWriteRent for Framed<IO, Codec> {
1046 #[inline]
1047 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
1048 self.inner.write(buf).await
1049 }
1050
1051 #[inline]
1052 async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> {
1053 self.inner.writev(buf_vec).await
1054 }
1055
1056 #[inline]
1057 async fn flush(&mut self) -> std::io::Result<()> {
1058 self.inner.flush().await
1059 }
1060
1061 #[inline]
1062 async fn shutdown(&mut self) -> std::io::Result<()> {
1063 self.inner.shutdown().await
1064 }
1065}
1066
1067impl<IO: AsyncWriteRent, Codec> AsyncWriteRent for FramedWrite<IO, Codec> {
1068 #[inline]
1069 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
1070 self.inner.write(buf).await
1071 }
1072
1073 #[inline]
1074 async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> {
1075 self.inner.writev(buf_vec).await
1076 }
1077
1078 #[inline]
1079 async fn flush(&mut self) -> std::io::Result<()> {
1080 self.inner.flush().await
1081 }
1082
1083 #[inline]
1084 async fn shutdown(&mut self) -> std::io::Result<()> {
1085 self.inner.shutdown().await
1086 }
1087}