use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use wasm_smtp::{IoError, StartTlsCapable, Transport};
use worker::Socket;
pub struct CloudflareTransport {
socket: Option<Socket>,
}
impl CloudflareTransport {
#[must_use]
pub fn from_socket(socket: Socket) -> Self {
Self {
socket: Some(socket),
}
}
#[must_use]
pub fn into_inner(self) -> Option<Socket> {
self.socket
}
fn socket_mut(&mut self) -> Result<&mut Socket, IoError> {
self.socket
.as_mut()
.ok_or_else(|| IoError::new("transport socket is missing (interrupted upgrade?)"))
}
}
impl core::fmt::Debug for CloudflareTransport {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CloudflareTransport")
.field("has_socket", &self.socket.is_some())
.finish_non_exhaustive()
}
}
impl Transport for CloudflareTransport {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
let socket = self.socket_mut()?;
read_async_io(socket, buf).await
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), IoError> {
let socket = self.socket_mut()?;
write_all_async_io(socket, buf).await
}
async fn close(&mut self) -> Result<(), IoError> {
let socket = self.socket_mut()?;
socket
.close()
.await
.map_err(|e| IoError::new(format!("socket close failed: {e}")))
}
}
impl StartTlsCapable for CloudflareTransport {
async fn upgrade_to_tls(&mut self) -> Result<(), IoError> {
let socket = self
.socket
.take()
.ok_or_else(|| IoError::new("transport socket is missing"))?;
let upgraded = socket.start_tls();
self.socket = Some(upgraded);
Ok(())
}
}
pub(crate) async fn read_async_io<S>(stream: &mut S, buf: &mut [u8]) -> Result<usize, IoError>
where
S: AsyncRead + Unpin,
{
AsyncReadExt::read(stream, buf)
.await
.map_err(|e| IoError::new(format!("read failed: {e}")))
}
pub(crate) async fn write_all_async_io<S>(stream: &mut S, buf: &[u8]) -> Result<(), IoError>
where
S: AsyncWrite + Unpin,
{
AsyncWriteExt::write_all(stream, buf)
.await
.map_err(|e| IoError::new(format!("write failed: {e}")))
}