use std::{fmt::Display, str::FromStr, time::Duration};
use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
use pubky_common::crypto::hash;
use reqwest::{Method, StatusCode};
use url::Url;
use crate::{PubkyHttpClient, cross_log, util::check_http_status};
pub const DEFAULT_HTTP_RELAY_INBOX: &str = "https://httprelay.pubky.app/inbox";
#[derive(Debug)]
enum PollError {
Timeout,
Failure(crate::errors::Error),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HttpRelayInboxChannel {
base_url: Url,
channel_id: String,
}
impl HttpRelayInboxChannel {
pub fn new(base_url: Url, channel_id: String) -> crate::errors::Result<Self> {
if base_url.cannot_be_a_base() {
return Err(crate::errors::Error::Parse(
url::ParseError::RelativeUrlWithCannotBeABaseBase,
));
}
if channel_id.is_empty() {
return Err(crate::errors::AuthError::Validation(
"channel_id must not be empty".into(),
)
.into());
}
Ok(Self {
base_url,
channel_id,
})
}
#[cfg(test)]
pub fn base_url(&self) -> &Url {
&self.base_url
}
#[must_use]
pub fn to_url(&self) -> Url {
let mut url = self.base_url.clone();
let mut segs = url
.path_segments_mut()
.expect("Always valid base url because it's been checked in new");
segs.pop_if_empty(); segs.push(&self.channel_id);
drop(segs);
url
}
fn ack_url(&self) -> Url {
let mut url = self.to_url();
url.path_segments_mut()
.expect("Always valid base url")
.push("ack");
url
}
fn await_url(&self) -> Url {
let mut url = self.to_url();
url.path_segments_mut()
.expect("Always valid base url")
.push("await");
url
}
async fn poll_once(
&self,
client: &PubkyHttpClient,
timeout: Option<Duration>,
) -> std::result::Result<reqwest::Response, PollError> {
let request = client
.cross_request(Method::GET, self.to_url())
.await
.map_err(PollError::Failure)?;
let request = match timeout {
Some(timeout) => request.timeout(timeout),
None => request,
};
let response = match request.send().await {
Ok(response) => response,
Err(err) if err.is_timeout() => return Err(PollError::Timeout),
Err(err) => return Err(PollError::Failure(err.into())),
};
if response.status() == StatusCode::REQUEST_TIMEOUT {
return Err(PollError::Timeout);
}
let response = match check_http_status(response).await {
Ok(response) => response,
Err(e) => return Err(PollError::Failure(e)),
};
Ok(response)
}
pub async fn poll(
&self,
client: &PubkyHttpClient,
timeout: Option<Duration>,
) -> crate::errors::Result<Option<Vec<u8>>> {
const MAX_FAILURES: usize = 3;
let start = web_time::Instant::now();
let mut attempt = 0;
let mut consecutive_failures = 0;
loop {
attempt += 1;
if let Some(timeout) = timeout
&& start.elapsed() >= timeout
{
return Ok(None);
}
let poll_timeout = timeout.map(|t| t.checked_sub(start.elapsed()).unwrap_or_default());
match self.poll_once(client, poll_timeout).await {
Ok(response) => {
cross_log!(
debug,
"Received response for http relay inbox channel polling attempt {attempt}: status {}",
response.status()
);
return Ok(Some(response.bytes().await?.to_vec()));
}
Err(e) => match e {
PollError::Timeout => {
consecutive_failures = 0;
}
PollError::Failure(e) => {
consecutive_failures += 1;
cross_log!(
error,
"Http relay inbox channel polling attempt {attempt} failed at {}: {e}",
self
);
if consecutive_failures >= MAX_FAILURES {
return Err(e);
}
}
},
}
}
}
pub async fn produce(
&self,
client: &PubkyHttpClient,
body: &[u8],
) -> crate::errors::Result<()> {
let request = client.cross_request(Method::POST, self.to_url()).await?;
let request = request.body(body.to_vec());
let response = request.send().await?;
check_http_status(response).await?;
Ok(())
}
pub async fn ack(&self, client: &PubkyHttpClient) -> crate::errors::Result<bool> {
let request = client.cross_request(Method::DELETE, self.to_url()).await?;
let response = request.send().await?;
match response.status() {
StatusCode::OK => Ok(true),
StatusCode::NOT_FOUND => Ok(false),
_ => {
check_http_status(response).await?;
Ok(false)
}
}
}
pub async fn check_ack(&self, client: &PubkyHttpClient) -> crate::errors::Result<Option<bool>> {
let request = client.cross_request(Method::GET, self.ack_url()).await?;
let response = request.send().await?;
match response.status() {
StatusCode::OK => {
let body = response.text().await?;
Ok(Some(body.trim() == "true"))
}
StatusCode::NOT_FOUND => Ok(None),
_ => {
check_http_status(response).await?;
Ok(None)
}
}
}
pub async fn await_ack(
&self,
client: &PubkyHttpClient,
timeout: Option<Duration>,
) -> crate::errors::Result<Option<bool>> {
let request = client.cross_request(Method::GET, self.await_url()).await?;
let request = match timeout {
Some(timeout) => request.timeout(timeout),
None => request,
};
let response = match request.send().await {
Ok(response) => response,
Err(err) if err.is_timeout() => return Ok(Some(false)),
Err(err) => return Err(err.into()),
};
match response.status() {
StatusCode::OK => Ok(Some(true)),
StatusCode::REQUEST_TIMEOUT => Ok(Some(false)),
StatusCode::NOT_FOUND => Ok(None),
_ => {
check_http_status(response).await?;
Ok(None)
}
}
}
}
impl Display for HttpRelayInboxChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_url())
}
}
impl FromStr for HttpRelayInboxChannel {
type Err = crate::errors::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let mut url = Url::parse(s).map_err(crate::errors::Error::Parse)?;
let mut segments = url.path_segments().ok_or(crate::errors::Error::Parse(
url::ParseError::RelativeUrlWithCannotBeABaseBase,
))?;
let channel_id = segments
.next_back()
.ok_or(crate::errors::Error::Parse(
url::ParseError::RelativeUrlWithCannotBeABaseBase,
))?
.to_string();
if channel_id.is_empty() {
return Err(crate::errors::AuthError::Validation(
"channel_id must not be empty".into(),
)
.into());
}
url.path_segments_mut()
.expect("Always valid url because it's been checked in parse")
.pop();
Self::new(url, channel_id)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EncryptedHttpRelayInboxChannel {
channel: HttpRelayInboxChannel,
secret: [u8; 32],
}
impl EncryptedHttpRelayInboxChannel {
pub fn new(relay_base_url: Url, secret: [u8; 32]) -> crate::errors::Result<Self> {
let channel_id = URL_SAFE_NO_PAD.encode(hash(&secret).as_bytes());
let channel = HttpRelayInboxChannel::new(relay_base_url, channel_id)?;
Ok(Self { channel, secret })
}
#[cfg(test)]
pub fn random_secret(relay_base_url: Url) -> crate::errors::Result<Self> {
use pubky_common::crypto::random_bytes;
let secret = random_bytes::<32>();
Self::new(relay_base_url, secret)
}
#[cfg(test)]
pub fn secret(&self) -> &[u8; 32] {
&self.secret
}
pub async fn produce(
&self,
client: &PubkyHttpClient,
body: &[u8],
) -> crate::errors::Result<()> {
let encrypted = pubky_common::crypto::encrypt(body, &self.secret);
self.channel.produce(client, &encrypted).await
}
pub async fn poll(
&self,
client: &PubkyHttpClient,
timeout: Option<Duration>,
) -> crate::errors::Result<Option<Vec<u8>>> {
let Some(response) = self.channel.poll(client, timeout).await? else {
return Ok(None);
};
let decrypted = pubky_common::crypto::decrypt(&response, &self.secret)?;
Ok(Some(decrypted))
}
pub async fn ack(&self, client: &PubkyHttpClient) -> crate::errors::Result<bool> {
self.channel.ack(client).await
}
pub async fn check_ack(&self, client: &PubkyHttpClient) -> crate::errors::Result<Option<bool>> {
self.channel.check_ack(client).await
}
pub async fn await_ack(
&self,
client: &PubkyHttpClient,
timeout: Option<Duration>,
) -> crate::errors::Result<Option<bool>> {
self.channel.await_ack(client, timeout).await
}
}
impl Display for EncryptedHttpRelayInboxChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.channel.to_url())
}
}
#[cfg(test)]
mod tests {
use pubky_common::crypto::random_bytes;
use super::*;
#[test]
fn test_new() {
let base_url = Url::parse(DEFAULT_HTTP_RELAY_INBOX).unwrap();
let channel = HttpRelayInboxChannel::new(base_url, "1234567890".to_string()).unwrap();
assert_eq!(
channel.to_url().as_str(),
"https://httprelay.pubky.app/inbox/1234567890"
);
}
#[test]
fn test_from_str() {
let channel = "https://httprelay.pubky.app/inbox/1234567890"
.parse::<HttpRelayInboxChannel>()
.unwrap();
assert_eq!(channel.base_url.as_str(), DEFAULT_HTTP_RELAY_INBOX);
assert_eq!(channel.channel_id, "1234567890");
}
#[test]
fn test_from_str_missing_channel_id() {
match "https://httprelay.pubky.app/".parse::<HttpRelayInboxChannel>() {
Ok(_) => {
panic!("Should error because missing channel id");
}
Err(e) => {
assert!(
matches!(
e,
crate::errors::Error::Authentication(crate::errors::AuthError::Validation(
_
))
),
"Expected validation error, got {e:?}"
);
}
}
}
#[test]
fn test_sub_urls() {
let base_url = Url::parse(DEFAULT_HTTP_RELAY_INBOX).unwrap();
let channel = HttpRelayInboxChannel::new(base_url, "test123".to_string()).unwrap();
assert_eq!(
channel.ack_url().as_str(),
"https://httprelay.pubky.app/inbox/test123/ack"
);
assert_eq!(
channel.await_url().as_str(),
"https://httprelay.pubky.app/inbox/test123/await"
);
}
async fn start_relay() -> (http_relay::HttpRelay, Url) {
let relay = http_relay::HttpRelay::builder()
.http_port(0)
.run()
.await
.unwrap();
let inbox_base = relay.local_url().join("inbox").unwrap();
(relay, inbox_base)
}
fn random_channel(inbox_base: &Url) -> HttpRelayInboxChannel {
let channel_bytes = random_bytes::<32>();
let channel_id = URL_SAFE_NO_PAD.encode(channel_bytes);
HttpRelayInboxChannel::new(inbox_base.clone(), channel_id).unwrap()
}
#[tokio::test]
async fn test_produce_and_poll() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
channel.produce(&client, b"Hello, inbox!").await.unwrap();
let response = channel
.poll(&client, Some(Duration::from_secs(5)))
.await
.unwrap()
.unwrap();
assert_eq!(response, b"Hello, inbox!");
}
#[tokio::test]
async fn test_idempotent_get_until_ack() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
channel.produce(&client, b"persistent msg").await.unwrap();
let resp1 = channel
.poll(&client, Some(Duration::from_secs(5)))
.await
.unwrap()
.unwrap();
let resp2 = channel
.poll(&client, Some(Duration::from_secs(5)))
.await
.unwrap()
.unwrap();
assert_eq!(resp1, resp2);
let acked = channel.ack(&client).await.unwrap();
assert!(acked);
let resp3 = channel
.poll(&client, Some(Duration::from_millis(500)))
.await
.unwrap();
assert!(resp3.is_none());
}
#[tokio::test]
async fn test_ack_nonexistent() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
let acked = channel.ack(&client).await.unwrap();
assert!(!acked);
}
#[tokio::test]
async fn test_check_ack() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
channel.produce(&client, b"check me").await.unwrap();
let status = channel.check_ack(&client).await.unwrap();
assert_eq!(status, Some(false));
channel.ack(&client).await.unwrap();
let status = channel.check_ack(&client).await.unwrap();
assert_eq!(status, Some(true));
}
#[tokio::test]
async fn test_await_ack() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
channel.produce(&client, b"await me").await.unwrap();
let channel_clone = channel.clone();
let ack_handle = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
let client = PubkyHttpClient::new().unwrap();
channel_clone.ack(&client).await.unwrap();
});
let result = channel
.await_ack(&client, Some(Duration::from_secs(10)))
.await
.unwrap();
assert_eq!(result, Some(true));
ack_handle.await.unwrap();
}
#[tokio::test]
async fn test_poll_timeout_no_message() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
let result = channel
.poll(&client, Some(Duration::from_millis(500)))
.await
.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_encrypted_produce_poll_ack() {
let (_relay, inbox_base) = start_relay().await;
let encrypted_channel = EncryptedHttpRelayInboxChannel::random_secret(inbox_base).unwrap();
let client = PubkyHttpClient::new().unwrap();
encrypted_channel
.produce(&client, b"encrypted inbox msg")
.await
.unwrap();
let response = encrypted_channel
.poll(&client, Some(Duration::from_secs(5)))
.await
.unwrap()
.unwrap();
assert_eq!(response, b"encrypted inbox msg");
let acked = encrypted_channel.ack(&client).await.unwrap();
assert!(acked);
}
#[tokio::test]
async fn test_check_ack_nonexistent_channel() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
let status = channel.check_ack(&client).await.unwrap();
assert_eq!(status, None);
}
#[tokio::test]
async fn test_await_ack_nonexistent_channel() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
let result = channel
.await_ack(&client, Some(Duration::from_secs(2)))
.await
.unwrap();
assert_eq!(result, None);
}
#[tokio::test]
async fn test_await_ack_server_timeout() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
channel.produce(&client, b"no ack coming").await.unwrap();
let result = channel
.await_ack(&client, Some(Duration::from_millis(500)))
.await
.unwrap();
assert_eq!(result, Some(false));
}
#[tokio::test]
async fn test_poll_returns_none_on_zero_timeout() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
let result = channel.poll(&client, Some(Duration::ZERO)).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_poll_succeeds_after_delayed_produce() {
let (_relay, inbox_base) = start_relay().await;
let client = PubkyHttpClient::new().unwrap();
let channel = random_channel(&inbox_base);
let chan = channel.clone();
let produce_handle = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
let client = PubkyHttpClient::new().unwrap();
chan.produce(&client, b"delayed msg").await.unwrap();
});
let result = channel
.poll(&client, Some(Duration::from_secs(10)))
.await
.unwrap()
.unwrap();
assert_eq!(result, b"delayed msg");
produce_handle.await.unwrap();
}
#[tokio::test]
async fn test_encrypted_concurrent() {
let (_relay, inbox_base) = start_relay().await;
let encrypted_channel = EncryptedHttpRelayInboxChannel::random_secret(inbox_base).unwrap();
let chan = encrypted_channel.clone();
let produce_handle = tokio::spawn(async move {
let client = PubkyHttpClient::new().unwrap();
chan.produce(&client, b"Hello, world!").await.unwrap();
});
let chan = encrypted_channel.clone();
let poll_handle = tokio::spawn(async move {
let client = PubkyHttpClient::new().unwrap();
let response = chan.poll(&client, None).await.unwrap().unwrap();
assert_eq!(response, b"Hello, world!");
});
let (produce_result, poll_result) = tokio::join!(produce_handle, poll_handle);
produce_result.unwrap();
poll_result.unwrap();
}
#[tokio::test]
async fn test_poll_errors_after_max_failures() {
use httpmock::prelude::*;
let server = MockServer::start();
server.mock(|when, then| {
when.method(GET).path_contains("/inbox/");
then.status(500).body("Internal Server Error");
});
let base_url = Url::parse(&format!("{}/inbox", server.base_url())).unwrap();
let channel = random_channel(&base_url);
let client = PubkyHttpClient::new().unwrap();
let result = channel.poll(&client, Some(Duration::from_secs(30))).await;
assert!(
result.is_err(),
"Expected error after MAX_FAILURES, got {result:?}"
);
}
}