use core::{
pin::Pin,
task::{Poll, Context}
};
use crate::{backend::{self, Encode, PollEncodeStatus}, io};
pub fn serializer<F, W>(format: F, writer: W) -> Serializer<F, W> {
Serializer::new(format, writer)
}
pub struct Serializer<F, W> {
pub format: F,
pub writer: W,
}
impl<F, W> Serializer<F, W> {
pub fn new(format: F, writer: W) -> Serializer<F, W> {
Serializer {
format,
writer,
}
}
pub fn into_sink<D>(self) -> Serialize<F, W, D>
where
F: backend::FormatEncode,
W: io::AsyncWrite + Unpin,
D: backend::Encodable,
{
Serialize::new(self)
}
pub fn serialize<'w, D>(&'w mut self, data: &'w D) -> D::Future<'w, F, W>
where
F: backend::FormatSerialize,
W: io::AsyncWrite + Unpin,
D: backend::AsyncSerialize,
{
D::serialize(data, &self.format, &mut self.writer)
}
}
enum State<F, D>
where
F: backend::FormatEncode,
D: backend::Encodable,
{
Ready,
Pending(D::Encoder<F>, D),
Error,
Closed,
}
pub struct Serialize<F, W, D>
where
F: backend::FormatEncode,
D: backend::Encodable,
{
serializer: Serializer<F, W>,
state: State<F, D>,
}
impl<F, W, D> Serialize<F, W, D>
where
F: backend::FormatEncode,
W: io::AsyncWrite + Unpin,
D: backend::Encodable,
{
pub fn new(serializer: Serializer<F, W>) -> Self
where
W: futures::AsyncWrite + Unpin,
{
Self {
serializer,
state: State::Ready,
}
}
pub fn is_ready(&self) -> bool {
matches!(self.state, State::Ready)
}
pub fn try_into_inner(self) -> Result<Serializer<F, W>, F> {
if let State::Ready = self.state {
Ok(self.serializer)
} else {
Err(self.serializer.format)
}
}
}
impl<F, W, D> Unpin for Serialize<F, W, D>
where
F: backend::FormatEncode,
D: backend::Encodable,
{}
impl<F, W, D> futures::Sink<D> for Serialize<F, W, D>
where
F: backend::FormatEncode,
W: io::AsyncWrite + Unpin,
D: backend::Encodable,
{
type Error = F::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let Self {
serializer,
state,
} = &mut *self;
match state {
State::Ready => Poll::Ready(Ok(())),
State::Pending(enc, data) => match enc.poll_encode(&serializer.format, &mut serializer.writer, data, cx) {
PollEncodeStatus::Fini => {
*state = State::Ready;
Poll::Ready(Ok(()))
}
PollEncodeStatus::Pending => Poll::Pending,
PollEncodeStatus::Error(e) => {
*state = State::Error;
Poll::Ready(Err(e))
}
}
State::Error => Poll::Ready(Err(<F as backend::Format>::invalid_input_err())),
State::Closed => Poll::Ready(Ok(())),
}
}
fn start_send(mut self: Pin<&mut Self>, item: D) -> Result<(), Self::Error> {
let state = &mut self.state;
if let State::Ready = state {
*state = State::Pending(<D::Encoder<F> as Encode>::init(&item), item);
Ok(())
} else {
Err(<F as backend::Format>::invalid_input_err())
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
futures::ready!(Pin::new(&mut *self).poll_ready(cx))?;
match futures::ready!(Pin::new(&mut self.serializer.writer).poll_flush(cx)) {
Ok(()) => Poll::Ready(Ok(())),
Err(e) => {
self.state = State::Error;
Poll::Ready(Err(e.into()))
}
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
futures::ready!(Pin::new(&mut *self).poll_ready(cx))?;
match futures::ready!(Pin::new(&mut self.serializer.writer).poll_close(cx)) {
Ok(()) => {
self.state = State::Closed;
Poll::Ready(Ok(()))
}
Err(e) => {
self.state = State::Error;
Poll::Ready(Err(e.into()))
}
}
}
}