cdrs_async/
frame_channel.rs

1use std::{
2  io,
3  pin::Pin,
4  task::{Context, Poll},
5};
6
7use async_std::{
8  io::{IoSlice, Write},
9  prelude::*,
10};
11use cassandra_proto::frame::{parser_async::parse_frame_async, Frame, IntoBytes};
12use futures::{sink::Sink, stream::Stream};
13use log::error;
14
15use crate::{compressor::Compression, transport::CDRSTransport};
16
17const READING_BUFFER_SIZE: usize = 1_000;
18
19/// Async channel that enable frame exchange with DB server.
20pub struct FrameChannel<T> {
21  transport: T,
22  sending_buffer: Vec<u8>,
23  receving_buffer: Vec<u8>,
24  compressor: Compression,
25  is_terminated: bool,
26}
27
28impl<T> FrameChannel<T> {
29  pub fn new(transport: T, compressor: Compression) -> FrameChannel<T> {
30    FrameChannel {
31      transport,
32      sending_buffer: Vec::with_capacity(8_000),
33      receving_buffer: Vec::with_capacity(8_000),
34      compressor,
35      is_terminated: false,
36    }
37  }
38}
39
40impl<T: CDRSTransport> Sink<Frame> for FrameChannel<T> {
41  type Error = io::Error;
42
43  fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
44    cx.waker().wake_by_ref();
45
46    Poll::Ready(Ok(()))
47  }
48
49  fn start_send(self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> {
50    self
51      .get_mut()
52      .sending_buffer
53      .extend_from_slice(&item.into_cbytes());
54
55    Ok(())
56  }
57
58  fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
59    let buff = self.sending_buffer.split_off(0);
60    let mut transport = Pin::new(&mut self.transport);
61
62    transport.write(&buff);
63    println!("before poll flushing");
64    let p = transport.poll_flush(cx);
65
66    p
67  }
68
69  fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
70    Pin::new(&mut self.transport).poll_close(cx)
71  }
72}
73
74impl<T: CDRSTransport> Write for FrameChannel<T> {
75  fn poll_write(
76    mut self: Pin<&mut Self>,
77    cx: &mut Context<'_>,
78    buf: &[u8],
79  ) -> Poll<io::Result<usize>> {
80    Pin::new(&mut self.transport).poll_write(cx, buf)
81  }
82
83  fn poll_write_vectored(
84    mut self: Pin<&mut Self>,
85    cx: &mut Context<'_>,
86    bufs: &[IoSlice<'_>],
87  ) -> Poll<io::Result<usize>> {
88    Pin::new(&mut self.transport).poll_write_vectored(cx, bufs)
89  }
90
91  fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
92    Pin::new(&mut self.transport).poll_flush(cx)
93  }
94
95  fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
96    Pin::new(&mut self.transport).poll_close(cx)
97  }
98}
99
100impl<T: CDRSTransport> Stream for FrameChannel<T> {
101  type Item = Frame;
102
103  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
104    let compressor = self.compressor;
105    let transport = Pin::new(&mut self.transport);
106    let mut buffer_slice = [0u8; READING_BUFFER_SIZE];
107
108    match transport.poll_read(cx, &mut buffer_slice) {
109      Poll::Ready(result) => match result {
110        Ok(n) => {
111          self.receving_buffer.extend_from_slice(&buffer_slice[0..n]);
112          if n == READING_BUFFER_SIZE || n == 0 {
113            return Poll::Pending;
114          } else {
115            // n < READING_BUFFER_SIZE means the function can proceed further
116          }
117        }
118        Err(err) => {
119          error!("CDRS frame_channel: {:?}", err);
120          self.is_terminated = true;
121          return Poll::Ready(None);
122        }
123      },
124      Poll::Pending => {
125        return Poll::Pending;
126      }
127    }
128
129    let mut buffer_cursor = io::Cursor::new(&mut self.receving_buffer);
130
131    match parse_frame_async(&mut buffer_cursor, &compressor) {
132      Err(err) => {
133        error!("CDRS frame_channel: parse frame error {:?}", err);
134        self.is_terminated = true;
135        return Poll::Ready(None);
136      }
137      Ok(Some(frame)) => {
138        let cursor_position = buffer_cursor.position();
139        self.receving_buffer = buffer_cursor
140          .into_inner()
141          .split_off(cursor_position as usize);
142
143        return Poll::Ready(Some(frame));
144      }
145      Ok(None) => {
146        return Poll::Pending;
147      }
148    }
149  }
150}