use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use reqwest::Method;
use tokio::sync::mpsc;
use tokio::time;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;
use crate::client::{Inner, request};
use crate::swarm::{BatchId, Error, PublicKey, Topic};
#[derive(Clone, Debug)]
pub struct PssApi {
pub(crate) inner: Arc<Inner>,
}
impl PssApi {
pub(crate) fn new(inner: Arc<Inner>) -> Self {
Self { inner }
}
pub async fn send(
&self,
batch_id: &BatchId,
topic: &Topic,
target: &str,
data: impl Into<Bytes>,
recipient: Option<&PublicKey>,
) -> Result<(), Error> {
let path = format!("pss/send/{}/{target}", topic.to_hex());
let mut builder = request(&self.inner, Method::POST, &path)?
.header("Swarm-Postage-Batch-Id", batch_id.to_hex())
.body(data.into());
if let Some(pk) = recipient {
let compressed = pk.compressed_hex()?;
builder = builder.query(&[("recipient", compressed)]);
}
self.inner.send(builder).await?;
Ok(())
}
pub async fn subscribe(&self, topic: &Topic) -> Result<Subscription, Error> {
let path = format!("pss/subscribe/{}", topic.to_hex());
Subscription::open(&self.inner, &path).await
}
pub async fn receive(&self, topic: &Topic, timeout: Duration) -> Result<Bytes, Error> {
let mut sub = self.subscribe(topic).await?;
let result = match time::timeout(timeout, sub.recv()).await {
Ok(Some(b)) => Ok(b),
Ok(None) => Err(Error::argument("pss subscription closed before message")),
Err(_) => Err(Error::argument("pss receive timed out")),
};
sub.cancel();
result
}
}
#[derive(Debug)]
pub struct Subscription {
rx: mpsc::Receiver<Bytes>,
cancel: Option<mpsc::Sender<()>>,
}
impl Subscription {
pub(crate) async fn open(inner: &Arc<Inner>, path: &str) -> Result<Self, Error> {
let mut url = inner.url(path)?;
let scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
other => {
return Err(Error::argument(format!(
"unsupported base URL scheme for websocket: {other}"
)));
}
};
url.set_scheme(scheme)
.map_err(|_| Error::argument("failed to set websocket scheme"))?;
let (ws, _resp) = connect_async(url.as_str())
.await
.map_err(|e| Error::argument(format!("websocket connect: {e}")))?;
let (mut sink, mut stream) = ws.split();
let (msg_tx, msg_rx) = mpsc::channel::<Bytes>(64);
let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel_rx.recv() => {
let _ = sink.send(Message::Close(None)).await;
let _ = sink.close().await;
break;
}
msg = stream.next() => {
match msg {
Some(Ok(Message::Binary(b))) => {
if msg_tx.send(Bytes::from(b)).await.is_err() {
break;
}
}
Some(Ok(Message::Text(t))) => {
if msg_tx
.send(Bytes::from(t.into_bytes()))
.await
.is_err()
{
break;
}
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(_)) => continue, Some(Err(_)) => break,
}
}
}
}
});
Ok(Self {
rx: msg_rx,
cancel: Some(cancel_tx),
})
}
pub async fn recv(&mut self) -> Option<Bytes> {
self.rx.recv().await
}
pub fn cancel(&mut self) {
if let Some(tx) = self.cancel.take() {
let _ = tx.try_send(());
}
}
}
impl Drop for Subscription {
fn drop(&mut self) {
self.cancel();
}
}