use std::pin::Pin;
use crate::PublicKey;
use base64::Engine;
use eventsource_stream::Eventsource;
use futures_util::{Stream, StreamExt};
use pubky_common::crypto::Hash;
use reqwest::Method;
use url::Url;
pub use pubky_common::events::{EventCursor, EventType};
use crate::{
Pkdns, PubkyHttpClient, PubkyResource, cross_log,
errors::{Error, RequestError, Result},
};
#[derive(Debug, Clone)]
pub struct Event {
pub event_type: EventType,
pub resource: PubkyResource,
pub cursor: EventCursor,
}
#[derive(Clone, Debug)]
pub struct EventStreamBuilder {
client: PubkyHttpClient,
users: Vec<(PublicKey, Option<EventCursor>)>,
homeserver: Option<PublicKey>,
limit: Option<u16>,
live: bool,
reverse: bool,
path: Option<String>,
}
impl EventStreamBuilder {
#[must_use]
pub fn for_user(
client: PubkyHttpClient,
user: &PublicKey,
cursor: Option<EventCursor>,
) -> Self {
Self {
client,
users: vec![(user.clone(), cursor)],
homeserver: None,
limit: None,
live: false,
reverse: false,
path: None,
}
}
#[must_use]
pub fn for_homeserver(client: PubkyHttpClient, homeserver: &PublicKey) -> Self {
Self {
client,
users: Vec::new(),
homeserver: Some(homeserver.clone()),
limit: None,
live: false,
reverse: false,
path: None,
}
}
pub fn add_users<'a>(
mut self,
users: impl IntoIterator<Item = (&'a PublicKey, Option<EventCursor>)>,
) -> Result<Self> {
for (user, cursor) in users {
if let Some(existing) = self.users.iter_mut().find(|(u, _)| u == user) {
existing.1 = cursor;
continue;
}
if self.users.len() >= 50 {
return Err(Error::from(RequestError::Validation {
message: "Cannot subscribe to more than 50 users".into(),
}));
}
self.users.push((user.clone(), cursor));
}
Ok(self)
}
#[must_use]
pub const fn limit(mut self, limit: u16) -> Self {
self.limit = Some(limit);
self
}
#[must_use]
pub const fn live(mut self) -> Self {
self.live = true;
self
}
#[must_use]
pub const fn reverse(mut self) -> Self {
self.reverse = true;
self
}
#[must_use]
pub fn path<S: Into<String>>(mut self, path: S) -> Self {
self.path = Some(path.into());
self
}
fn build_request_url(&self, homeserver: &PublicKey) -> Result<Url> {
let mut url = Url::parse(&format!("https://{}/events-stream", homeserver.z32()))?;
{
let mut query = url.query_pairs_mut();
for (user, cursor) in &self.users {
if let Some(c) = cursor {
query.append_pair("user", &format!("{}:{c}", user.z32()));
} else {
query.append_pair("user", &user.z32());
}
}
if let Some(limit) = self.limit {
query.append_pair("limit", &limit.to_string());
}
if self.live {
query.append_pair("live", "true");
}
if self.reverse {
query.append_pair("reverse", "true");
}
if let Some(path) = &self.path {
query.append_pair("path", path);
}
}
cross_log!(debug, "Event stream URL: {}", url);
Ok(url)
}
async fn subscribe_internal(self) -> Result<impl Stream<Item = Result<Event>>> {
if self.live && self.reverse {
return Err(Error::from(RequestError::Validation {
message: "Cannot use live mode with reverse ordering".into(),
}));
}
if self.users.is_empty() {
return Err(Error::from(RequestError::Validation {
message: "At least one user must be specified".into(),
}));
}
if self.users.len() > 50 {
return Err(Error::from(RequestError::Validation {
message: "Cannot subscribe to more than 50 users".into(),
}));
}
let homeserver = if let Some(hs) = &self.homeserver {
hs.clone()
} else {
let (first_user, _) = &self.users[0];
Pkdns::with_client(self.client.clone())
.get_homeserver_of(first_user)
.await
.ok_or_else(|| {
Error::from(RequestError::Validation {
message: format!("Could not resolve homeserver for user {first_user}"),
})
})?
};
cross_log!(
info,
"Subscribing to event stream for {} user(s) on homeserver {}",
self.users.len(),
homeserver
);
let url = self.build_request_url(&homeserver)?;
let response = self
.client
.cross_request(Method::GET, url)
.await?
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let message = format!("Event stream request failed with status {status}");
return Err(Error::from(RequestError::Server { status, message }));
}
let sse_stream = response.bytes_stream().eventsource();
let event_stream = sse_stream.filter_map(|result| async move {
match result {
Ok(sse_event) => match parse_sse_event(&sse_event) {
Ok(event) => Some(Ok(event)),
Err(e) => {
cross_log!(error, "Failed to parse SSE event, skipping: {}", e);
None
}
},
Err(e) => {
cross_log!(error, "SSE stream error: {}", e);
Some(Err(Error::from(RequestError::Validation {
message: format!("SSE stream error: {e}"),
})))
}
}
});
Ok(event_stream)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn subscribe(self) -> Result<Pin<Box<dyn Stream<Item = Result<Event>> + Send>>> {
let stream = self.subscribe_internal().await?;
Ok(Box::pin(stream))
}
#[cfg(target_arch = "wasm32")]
pub async fn subscribe(self) -> Result<Pin<Box<dyn Stream<Item = Result<Event>>>>> {
let stream = self.subscribe_internal().await?;
Ok(Box::pin(stream))
}
}
fn parse_sse_event(sse: &eventsource_stream::Event) -> Result<Event> {
let mut path: Option<String> = None;
let mut cursor: Option<EventCursor> = None;
let mut content_hash_base64: Option<String> = None;
for (i, line) in sse.data.lines().enumerate() {
if let Some(cursor_str) = line.strip_prefix("cursor: ") {
cursor = Some(cursor_str.parse::<EventCursor>().map_err(|e| {
Error::from(RequestError::Validation {
message: format!("Invalid cursor format '{cursor_str}': {e}"),
})
})?);
} else if let Some(hash) = line.strip_prefix("content_hash: ") {
content_hash_base64 = Some(hash.to_string());
} else if i == 0 {
path = Some(line.to_string());
}
}
let path = path.ok_or_else(|| {
Error::from(RequestError::Validation {
message: "SSE event missing path (expected as first line)".into(),
})
})?;
let resource: PubkyResource = path.parse().map_err(|e| {
Error::from(RequestError::Validation {
message: format!("Invalid resource path '{path}': {e}"),
})
})?;
let cursor = cursor.ok_or_else(|| {
Error::from(RequestError::Validation {
message: "SSE event missing cursor line".into(),
})
})?;
let event_type = match sse.event.as_str() {
"PUT" => {
let content_hash = decode_content_hash(content_hash_base64.as_deref())?;
EventType::Put { content_hash }
}
"DEL" => EventType::Delete,
other => {
return Err(Error::from(RequestError::Validation {
message: format!("Unknown event type: {other}"),
}));
}
};
Ok(Event {
event_type,
resource,
cursor,
})
}
fn decode_content_hash(content_hash_base64: Option<&str>) -> Result<Hash> {
let b64 = content_hash_base64
.filter(|s| !s.is_empty())
.ok_or_else(|| {
Error::from(RequestError::Validation {
message: "PUT event missing required content_hash".into(),
})
})?;
let bytes = base64::engine::general_purpose::STANDARD
.decode(b64)
.map_err(|e| {
Error::from(RequestError::Validation {
message: format!("Invalid content_hash base64 encoding: {e}"),
})
})?;
let hash_bytes: [u8; 32] = bytes.try_into().map_err(|bytes: Vec<u8>| {
Error::from(RequestError::Validation {
message: format!(
"content_hash must be exactly 32 bytes, got {} bytes",
bytes.len()
),
})
})?;
Ok(Hash::from_bytes(hash_bytes))
}
#[cfg(test)]
mod tests {
use super::*;
fn make_sse(event: &str, data: &str) -> eventsource_stream::Event {
eventsource_stream::Event {
event: event.to_string(),
data: data.to_string(),
id: String::new(),
retry: None,
}
}
fn encode_hash(bytes: [u8; 32]) -> String {
base64::engine::general_purpose::STANDARD.encode(bytes)
}
#[test]
fn parse_put_event_with_content_hash() {
let hash_bytes = [1u8; 32];
let hash_b64 = encode_hash(hash_bytes);
let sse = make_sse(
"PUT",
&format!(
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/example.txt\ncursor: 42\ncontent_hash: {hash_b64}"
),
);
let event = parse_sse_event(&sse).unwrap();
assert!(matches!(event.event_type, EventType::Put { .. }));
assert_eq!(event.resource.path.as_str(), "/pub/example.txt");
assert_eq!(event.cursor.id(), 42);
assert_eq!(
event.event_type.content_hash(),
Some(&Hash::from_bytes(hash_bytes))
);
}
#[test]
fn parse_del_event_without_content_hash() {
let sse = make_sse(
"DEL",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/deleted.txt\ncursor: 100",
);
let event = parse_sse_event(&sse).unwrap();
assert_eq!(event.event_type, EventType::Delete);
assert_eq!(event.resource.path.as_str(), "/pub/deleted.txt");
assert_eq!(event.cursor.id(), 100);
assert_eq!(event.event_type.content_hash(), None);
}
#[test]
fn parse_event_with_unknown_prefixed_lines_for_forward_compatibility() {
let hash_bytes = [2u8; 32];
let hash_b64 = encode_hash(hash_bytes);
let sse = make_sse(
"PUT",
&format!(
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 50\nfuture_field: some_value\nanother_future: 123\ncontent_hash: {hash_b64}"
),
);
let event = parse_sse_event(&sse).unwrap();
assert!(matches!(event.event_type, EventType::Put { .. }));
assert_eq!(event.cursor.id(), 50);
assert_eq!(
event.event_type.content_hash(),
Some(&Hash::from_bytes(hash_bytes))
);
}
#[test]
fn parse_event_with_lines_in_different_order() {
let hash_bytes = [3u8; 32];
let hash_b64 = encode_hash(hash_bytes);
let sse = make_sse(
"PUT",
&format!(
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/test.txt\ncontent_hash: {hash_b64}\ncursor: 999"
),
);
let event = parse_sse_event(&sse).unwrap();
assert_eq!(event.cursor.id(), 999);
assert_eq!(
event.event_type.content_hash(),
Some(&Hash::from_bytes(hash_bytes))
);
}
#[test]
fn error_on_unknown_event_type() {
let sse = make_sse(
"PATCH",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1",
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Unknown event type: PATCH"), "Got: {err}");
}
#[test]
fn error_on_missing_path() {
let hash_b64 = encode_hash([0u8; 32]);
let sse = make_sse("PUT", &format!("cursor: 42\ncontent_hash: {hash_b64}"));
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("missing path") || err.contains("Invalid resource"),
"Got: {err}"
);
}
#[test]
fn error_on_missing_cursor() {
let hash_b64 = encode_hash([0u8; 32]);
let sse = make_sse(
"PUT",
&format!(
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncontent_hash: {hash_b64}"
),
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("missing cursor"), "Got: {err}");
}
#[test]
fn error_on_invalid_cursor_format() {
let sse = make_sse(
"PUT",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: not_a_number",
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Invalid cursor format"), "Got: {err}");
}
#[test]
fn error_on_negative_cursor() {
let sse = make_sse(
"PUT",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: -100",
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Invalid cursor format"), "Got: {err}");
}
#[test]
fn error_on_invalid_pubky_resource_path() {
let sse = make_sse("PUT", "not-a-valid-pubky-url\ncursor: 42");
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Invalid resource path"), "Got: {err}");
}
#[test]
fn error_on_empty_content_hash() {
let sse = make_sse(
"PUT",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: ",
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("missing required content_hash"), "Got: {err}");
}
#[test]
fn error_on_missing_content_hash() {
let sse = make_sse(
"PUT",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1",
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("missing required content_hash"), "Got: {err}");
}
#[test]
fn parse_event_with_large_cursor() {
let hash_b64 = encode_hash([0u8; 32]);
let sse = make_sse(
"PUT",
&format!(
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 9223372036854775807\ncontent_hash: {hash_b64}"
),
);
let event = parse_sse_event(&sse).unwrap();
assert_eq!(event.cursor.id(), 9_223_372_036_854_775_807_u64);
}
#[test]
fn error_on_invalid_base64_content_hash() {
let sse = make_sse(
"PUT",
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: not-valid-base64!!!",
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Invalid content_hash"), "Got: {err}");
}
#[test]
fn error_on_wrong_length_content_hash() {
let short_hash = base64::engine::general_purpose::STANDARD.encode([1u8; 16]);
let sse = make_sse(
"PUT",
&format!(
"pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: {short_hash}"
),
);
let result = parse_sse_event(&sse);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("32 bytes"), "Got: {err}");
}
fn test_pubkeys(count: usize) -> Vec<PublicKey> {
(0..count)
.map(|_| crate::Keypair::random().public_key())
.collect()
}
#[test]
fn builder_constructors_and_add_users() {
let client = crate::PubkyHttpClient::testnet().unwrap();
let keys = test_pubkeys(4);
let builder =
EventStreamBuilder::for_user(client.clone(), &keys[0], Some(EventCursor::new(42)));
assert_eq!(builder.users.len(), 1);
assert_eq!(builder.users[0].0, keys[0]);
assert_eq!(builder.users[0].1, Some(EventCursor::new(42)));
assert!(builder.homeserver.is_none());
let builder = EventStreamBuilder::for_homeserver(client.clone(), &keys[0]);
assert!(builder.users.is_empty());
assert_eq!(builder.homeserver.as_ref(), Some(&keys[0]));
let builder = EventStreamBuilder::for_homeserver(client.clone(), &keys[0])
.add_users([
(&keys[1], None),
(&keys[2], Some(EventCursor::new(100))),
(&keys[3], Some(EventCursor::new(200))),
])
.unwrap();
assert_eq!(builder.users.len(), 3);
assert_eq!(builder.users[0], (keys[1].clone(), None));
assert_eq!(
builder.users[1],
(keys[2].clone(), Some(EventCursor::new(100)))
);
assert_eq!(
builder.users[2],
(keys[3].clone(), Some(EventCursor::new(200)))
);
let builder = EventStreamBuilder::for_homeserver(client.clone(), &keys[0])
.add_users([(&keys[1], Some(EventCursor::new(10))), (&keys[2], None)])
.unwrap()
.add_users([(&keys[1], Some(EventCursor::new(999)))])
.unwrap();
assert_eq!(builder.users.len(), 2);
assert_eq!(builder.users[0].1, Some(EventCursor::new(999))); assert_eq!(builder.users[1].1, None);
let builder = EventStreamBuilder::for_user(client.clone(), &keys[0], None)
.limit(100)
.live()
.path("/pub/posts/".to_string());
assert_eq!(builder.limit, Some(100));
assert!(builder.live);
assert!(!builder.reverse);
assert_eq!(builder.path, Some("/pub/posts/".to_string()));
let builder = EventStreamBuilder::for_user(client, &keys[0], None)
.limit(50)
.reverse()
.path("/pub/files/".to_string());
assert_eq!(builder.limit, Some(50));
assert!(!builder.live);
assert!(builder.reverse);
assert_eq!(builder.path, Some("/pub/files/".to_string()));
}
#[test]
fn add_users_errors_on_exceeding_50_users() {
let client = crate::PubkyHttpClient::testnet().unwrap();
let keys = test_pubkeys(52);
let homeserver = &keys[0];
let users = &keys[1..];
let user_refs: Vec<_> = users.iter().map(|u| (u, None)).collect();
let result = EventStreamBuilder::for_homeserver(client, homeserver).add_users(user_refs);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("50 users"), "Got: {err}");
}
#[test]
fn build_request_url_constructs_correct_query_params() {
let client = crate::PubkyHttpClient::testnet().unwrap();
let keys = test_pubkeys(3);
let homeserver = &keys[0];
let user1 = &keys[1];
let user2 = &keys[2];
let builder = EventStreamBuilder::for_homeserver(client.clone(), homeserver)
.add_users([(user1, None), (user2, Some(EventCursor::new(42)))])
.unwrap()
.limit(100)
.live()
.path("/pub/posts/");
let url = builder.build_request_url(homeserver).unwrap();
assert_eq!(url.scheme(), "https");
assert_eq!(url.path(), "/events-stream");
let query: Vec<_> = url.query_pairs().collect();
assert_eq!(query.len(), 5, "Should have 5 query params: {:?}", query);
let user_params: Vec<_> = query
.iter()
.filter(|(k, _)| k == "user")
.map(|(_, v)| v.to_string())
.collect();
assert_eq!(user_params.len(), 2);
assert!(
user_params.iter().any(|v| v == &user1.z32()),
"Should have user1 without cursor"
);
assert!(
user_params
.iter()
.any(|v| v == &format!("{}:42", user2.z32())),
"Should have user2 with cursor"
);
assert!(query.iter().any(|(k, v)| k == "limit" && v == "100"));
assert!(query.iter().any(|(k, v)| k == "live" && v == "true"));
assert!(query.iter().any(|(k, v)| k == "path" && v == "/pub/posts/"));
let builder_reverse = EventStreamBuilder::for_homeserver(client, homeserver)
.add_users([(user1, None)])
.unwrap()
.reverse()
.limit(50);
let url_reverse = builder_reverse.build_request_url(homeserver).unwrap();
let query_reverse: Vec<_> = url_reverse.query_pairs().collect();
assert!(
query_reverse
.iter()
.any(|(k, v)| k == "reverse" && v == "true")
);
assert!(
!query_reverse.iter().any(|(k, _)| k == "live"),
"Should not have live param when reverse is set"
);
}
#[tokio::test]
async fn subscribe_fails_with_no_users() {
let client = crate::PubkyHttpClient::testnet().unwrap();
let keys = test_pubkeys(1);
let homeserver = &keys[0];
let empty: [(&PublicKey, Option<EventCursor>); 0] = [];
let builder = EventStreamBuilder::for_homeserver(client, homeserver)
.add_users(empty)
.unwrap();
assert!(builder.users.is_empty());
assert_eq!(builder.homeserver.as_ref(), Some(homeserver));
let result = builder.subscribe().await;
match result {
Ok(_) => panic!("Expected error, but subscribe succeeded"),
Err(e) => {
let err = e.to_string();
assert!(
err.contains("At least one user must be specified"),
"Got: {err}"
);
}
}
}
}