use std::fmt::{self, Debug, Formatter};
use futures_util::StreamExt;
use quinn::{RecvStream, SendStream};
use serde::{de::DeserializeOwned, Serialize};
use super::ReceiverStream;
use crate::{Error, Receiver, Result, Sender};
#[must_use = "`Incoming` does nothing unless accepted with `Incoming::accept`"]
pub struct Incoming<T: DeserializeOwned> {
sender: SendStream,
receiver: ReceiverStream<T>,
r#type: Option<Result<T>>,
}
impl<T: DeserializeOwned> Debug for Incoming<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("sender", &self.sender)
.field("receiver", &"ReceiverStream<T>")
.field("type", &"Option<Result<T>>")
.finish()
}
}
impl<T: DeserializeOwned> Incoming<T> {
pub(super) fn new(sender: SendStream, receiver: RecvStream) -> Self {
Self {
sender,
receiver: ReceiverStream::new(receiver),
r#type: None,
}
}
#[allow(unused_lifetimes)]
pub async fn r#type(&mut self) -> Result<&T, &Error> {
if let Some(ref r#type) = self.r#type {
r#type.as_ref()
} else {
let r#type = self.receiver.next().await.unwrap_or(Err(Error::NoType));
self.r#type = Some(r#type);
#[allow(clippy::expect_used)]
self.r#type
.as_ref()
.expect("`type` just inserted missing")
.as_ref()
}
}
pub async fn accept<
S: DeserializeOwned + Serialize + Send + 'static,
R: DeserializeOwned + Serialize + Send + 'static,
>(
mut self,
) -> Result<(Sender<S>, Receiver<R>)> {
match self.r#type {
Some(Ok(_)) => (),
Some(Err(error)) => return Err(error),
None => {
let _type = self.receiver.next().await.unwrap_or(Err(Error::NoType))?;
}
}
let sender = Sender::new(self.sender);
let receiver = Receiver::new(self.receiver.transmute());
Ok((sender, receiver))
}
}