1use std::sync::Arc;
7use std::time::Duration;
8
9use bytes::Bytes;
10use futures_util::{SinkExt, StreamExt};
11use reqwest::Method;
12use tokio::sync::mpsc;
13use tokio::time;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::protocol::Message;
16
17use crate::client::{Inner, request};
18use crate::swarm::{BatchId, Error, PublicKey, Topic};
19
20#[derive(Clone, Debug)]
22pub struct PssApi {
23 pub(crate) inner: Arc<Inner>,
24}
25
26impl PssApi {
27 pub(crate) fn new(inner: Arc<Inner>) -> Self {
28 Self { inner }
29 }
30
31 pub async fn send(
42 &self,
43 batch_id: &BatchId,
44 topic: &Topic,
45 target: &str,
46 data: impl Into<Bytes>,
47 recipient: Option<&PublicKey>,
48 ) -> Result<(), Error> {
49 let path = format!("pss/send/{}/{target}", topic.to_hex());
50 let mut builder = request(&self.inner, Method::POST, &path)?
51 .header("Swarm-Postage-Batch-Id", batch_id.to_hex())
52 .body(data.into());
53 if let Some(pk) = recipient {
54 let compressed = pk.compressed_hex()?;
55 builder = builder.query(&[("recipient", compressed)]);
56 }
57 self.inner.send(builder).await?;
58 Ok(())
59 }
60
61 pub async fn subscribe(&self, topic: &Topic) -> Result<Subscription, Error> {
65 let path = format!("pss/subscribe/{}", topic.to_hex());
66 Subscription::open(&self.inner, &path).await
67 }
68
69 pub async fn receive(&self, topic: &Topic, timeout: Duration) -> Result<Bytes, Error> {
72 let mut sub = self.subscribe(topic).await?;
73 let result = match time::timeout(timeout, sub.recv()).await {
74 Ok(Some(b)) => Ok(b),
75 Ok(None) => Err(Error::argument("pss subscription closed before message")),
76 Err(_) => Err(Error::argument("pss receive timed out")),
77 };
78 sub.cancel();
79 result
80 }
81}
82
83#[derive(Debug)]
86pub struct Subscription {
87 rx: mpsc::Receiver<Bytes>,
88 cancel: Option<mpsc::Sender<()>>,
89}
90
91impl Subscription {
92 pub(crate) async fn open(inner: &Arc<Inner>, path: &str) -> Result<Self, Error> {
95 let mut url = inner.url(path)?;
96 let scheme = match url.scheme() {
97 "http" => "ws",
98 "https" => "wss",
99 other => {
100 return Err(Error::argument(format!(
101 "unsupported base URL scheme for websocket: {other}"
102 )));
103 }
104 };
105 url.set_scheme(scheme)
106 .map_err(|_| Error::argument("failed to set websocket scheme"))?;
107
108 let (ws, _resp) = connect_async(url.as_str())
109 .await
110 .map_err(|e| Error::argument(format!("websocket connect: {e}")))?;
111 let (mut sink, mut stream) = ws.split();
112
113 let (msg_tx, msg_rx) = mpsc::channel::<Bytes>(64);
114 let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
115
116 tokio::spawn(async move {
117 loop {
118 tokio::select! {
119 _ = cancel_rx.recv() => {
120 let _ = sink.send(Message::Close(None)).await;
121 let _ = sink.close().await;
122 break;
123 }
124 msg = stream.next() => {
125 match msg {
126 Some(Ok(Message::Binary(b))) => {
127 if msg_tx.send(Bytes::from(b)).await.is_err() {
128 break;
129 }
130 }
131 Some(Ok(Message::Text(t))) => {
132 if msg_tx
133 .send(Bytes::from(t.into_bytes()))
134 .await
135 .is_err()
136 {
137 break;
138 }
139 }
140 Some(Ok(Message::Close(_))) | None => break,
141 Some(Ok(_)) => continue, Some(Err(_)) => break,
143 }
144 }
145 }
146 }
147 });
148
149 Ok(Self {
150 rx: msg_rx,
151 cancel: Some(cancel_tx),
152 })
153 }
154
155 pub async fn recv(&mut self) -> Option<Bytes> {
158 self.rx.recv().await
159 }
160
161 pub fn cancel(&mut self) {
163 if let Some(tx) = self.cancel.take() {
164 let _ = tx.try_send(());
165 }
166 }
167}
168
169impl Drop for Subscription {
170 fn drop(&mut self) {
171 self.cancel();
172 }
173}