use futures::Stream;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::misc::AccountError;
use citadel_io::tokio;
use citadel_types::proto::{
ObjectTransferOrientation, ObjectTransferStatus, VirtualObjectMetadata,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
#[derive(Debug)]
pub struct ObjectTransferHandlerInner {
inner: UnboundedReceiver<ObjectTransferStatus>,
}
#[derive(Debug)]
pub struct ObjectTransferHandler {
pub source: u64,
pub receiver: u64,
pub metadata: VirtualObjectMetadata,
pub orientation: ObjectTransferOrientation,
start_recv_tx: FileTransferStarter,
pub inner: ObjectTransferHandlerInner,
}
#[derive(Debug)]
pub struct FileTransferStarter {
inner: Option<tokio::sync::oneshot::Sender<bool>>,
}
impl Deref for FileTransferStarter {
type Target = Option<tokio::sync::oneshot::Sender<bool>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for FileTransferStarter {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl Drop for FileTransferStarter {
fn drop(&mut self) {
if self.inner.is_some() {
log::warn!(target: "citadel", "FileTransferStarter dropped without being used");
}
}
}
impl Stream for ObjectTransferHandlerInner {
type Item = ObjectTransferStatus;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_recv(cx)
}
}
impl Deref for ObjectTransferHandler {
type Target = ObjectTransferHandlerInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for ObjectTransferHandler {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl ObjectTransferHandler {
pub fn new(
source: u64,
receiver: u64,
metadata: VirtualObjectMetadata,
orientation: ObjectTransferOrientation,
start_recv_tx: Option<tokio::sync::oneshot::Sender<bool>>,
) -> (Self, UnboundedSender<ObjectTransferStatus>) {
let (tx, inner) = unbounded_channel();
let this = Self {
inner: ObjectTransferHandlerInner { inner },
source,
receiver,
orientation,
metadata,
start_recv_tx: FileTransferStarter {
inner: start_recv_tx,
},
};
(this, tx)
}
pub async fn exhaust_stream(&mut self) -> Result<Option<PathBuf>, AccountError> {
self.accept()?;
let mut save_path = None;
while let Some(event) = self.inner.inner.recv().await {
match event {
ObjectTransferStatus::ReceptionBeginning(path, _) => {
save_path = Some(path);
}
ObjectTransferStatus::ReceptionComplete => {
return Ok(save_path);
}
ObjectTransferStatus::TransferComplete => {
return Ok(None);
}
ObjectTransferStatus::Fail(err) => {
return Err(citadel_io::error!(
citadel_io::ErrorCode::FileTransferFailed,
err
));
}
_ => {}
}
}
Err(citadel_io::error!(
citadel_io::ErrorCode::FileReceiveStreamEnded
))
}
pub async fn receive_file(&mut self) -> Result<PathBuf, AccountError> {
if !matches!(self.orientation, ObjectTransferOrientation::Receiver { .. }) {
return Err(citadel_io::error!(
citadel_io::ErrorCode::FileReceiveWrongOrientation
));
}
let file = self.exhaust_stream().await?;
file.ok_or_else(|| citadel_io::error!(citadel_io::ErrorCode::FileReceiveNoPath))
}
pub async fn transfer_file(&mut self) -> Result<(), AccountError> {
if !matches!(self.orientation, ObjectTransferOrientation::Sender) {
return Err(citadel_io::error!(
citadel_io::ErrorCode::FileTransferWrongOrientation
));
}
let file = self.exhaust_stream().await?;
if file.is_some() {
Err(citadel_io::error!(
citadel_io::ErrorCode::FileTransferUnexpectedSavePath
))
} else {
Ok(())
}
}
pub fn accept(&mut self) -> Result<(), AccountError> {
self.respond(true)
}
pub fn decline(&mut self) -> Result<(), AccountError> {
self.respond(false)
}
fn respond(&mut self, accept: bool) -> Result<(), AccountError> {
if let Some(tx) = self.start_recv_tx.take() {
tx.send(accept).map_err(|_| {
citadel_io::error!(citadel_io::ErrorCode::FileTransferResponseFailed)
})?;
}
Ok(())
}
}