cdrs_async/
frame_channel.rs1use std::{
2 io,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use async_std::{
8 io::{IoSlice, Write},
9 prelude::*,
10};
11use cassandra_proto::frame::{parser_async::parse_frame_async, Frame, IntoBytes};
12use futures::{sink::Sink, stream::Stream};
13use log::error;
14
15use crate::{compressor::Compression, transport::CDRSTransport};
16
17const READING_BUFFER_SIZE: usize = 1_000;
18
19pub struct FrameChannel<T> {
21 transport: T,
22 sending_buffer: Vec<u8>,
23 receving_buffer: Vec<u8>,
24 compressor: Compression,
25 is_terminated: bool,
26}
27
28impl<T> FrameChannel<T> {
29 pub fn new(transport: T, compressor: Compression) -> FrameChannel<T> {
30 FrameChannel {
31 transport,
32 sending_buffer: Vec::with_capacity(8_000),
33 receving_buffer: Vec::with_capacity(8_000),
34 compressor,
35 is_terminated: false,
36 }
37 }
38}
39
40impl<T: CDRSTransport> Sink<Frame> for FrameChannel<T> {
41 type Error = io::Error;
42
43 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
44 cx.waker().wake_by_ref();
45
46 Poll::Ready(Ok(()))
47 }
48
49 fn start_send(self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> {
50 self
51 .get_mut()
52 .sending_buffer
53 .extend_from_slice(&item.into_cbytes());
54
55 Ok(())
56 }
57
58 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
59 let buff = self.sending_buffer.split_off(0);
60 let mut transport = Pin::new(&mut self.transport);
61
62 transport.write(&buff);
63 println!("before poll flushing");
64 let p = transport.poll_flush(cx);
65
66 p
67 }
68
69 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
70 Pin::new(&mut self.transport).poll_close(cx)
71 }
72}
73
74impl<T: CDRSTransport> Write for FrameChannel<T> {
75 fn poll_write(
76 mut self: Pin<&mut Self>,
77 cx: &mut Context<'_>,
78 buf: &[u8],
79 ) -> Poll<io::Result<usize>> {
80 Pin::new(&mut self.transport).poll_write(cx, buf)
81 }
82
83 fn poll_write_vectored(
84 mut self: Pin<&mut Self>,
85 cx: &mut Context<'_>,
86 bufs: &[IoSlice<'_>],
87 ) -> Poll<io::Result<usize>> {
88 Pin::new(&mut self.transport).poll_write_vectored(cx, bufs)
89 }
90
91 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
92 Pin::new(&mut self.transport).poll_flush(cx)
93 }
94
95 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
96 Pin::new(&mut self.transport).poll_close(cx)
97 }
98}
99
100impl<T: CDRSTransport> Stream for FrameChannel<T> {
101 type Item = Frame;
102
103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
104 let compressor = self.compressor;
105 let transport = Pin::new(&mut self.transport);
106 let mut buffer_slice = [0u8; READING_BUFFER_SIZE];
107
108 match transport.poll_read(cx, &mut buffer_slice) {
109 Poll::Ready(result) => match result {
110 Ok(n) => {
111 self.receving_buffer.extend_from_slice(&buffer_slice[0..n]);
112 if n == READING_BUFFER_SIZE || n == 0 {
113 return Poll::Pending;
114 } else {
115 }
117 }
118 Err(err) => {
119 error!("CDRS frame_channel: {:?}", err);
120 self.is_terminated = true;
121 return Poll::Ready(None);
122 }
123 },
124 Poll::Pending => {
125 return Poll::Pending;
126 }
127 }
128
129 let mut buffer_cursor = io::Cursor::new(&mut self.receving_buffer);
130
131 match parse_frame_async(&mut buffer_cursor, &compressor) {
132 Err(err) => {
133 error!("CDRS frame_channel: parse frame error {:?}", err);
134 self.is_terminated = true;
135 return Poll::Ready(None);
136 }
137 Ok(Some(frame)) => {
138 let cursor_position = buffer_cursor.position();
139 self.receving_buffer = buffer_cursor
140 .into_inner()
141 .split_off(cursor_position as usize);
142
143 return Poll::Ready(Some(frame));
144 }
145 Ok(None) => {
146 return Poll::Pending;
147 }
148 }
149 }
150}