use crate::persistence::{
files::events::{EventCursor, EventEntity, EventRepository, EventType},
sql::UnifiedExecutor,
};
use crate::shared::webdav::EntryPath;
use sqlx::PgPool;
use tokio::sync::broadcast;
pub const MAX_EVENT_STREAM_USERS: usize = 50;
pub(crate) const PG_NOTIFY_CHANNEL: &str = "events";
#[derive(Clone, Debug)]
pub struct EventsService {
event_tx: broadcast::Sender<EventEntity>,
channel_capacity: usize,
}
impl EventsService {
pub fn new(channel_capacity: usize) -> Self {
let (event_tx, _rx) = broadcast::channel(channel_capacity);
Self {
event_tx,
channel_capacity,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<EventEntity> {
self.event_tx.subscribe()
}
pub fn channel_capacity(&self) -> usize {
self.channel_capacity
}
pub async fn create_event<'a>(
&self,
user_id: i32,
event_type: EventType,
path: &EntryPath,
executor: &mut UnifiedExecutor<'a>,
) -> Result<EventEntity, sqlx::Error> {
EventRepository::create(user_id, event_type, path, executor).await
}
pub(crate) fn broadcast_event(&self, event: EventEntity) {
match self.event_tx.send(event) {
Ok(_) => {} Err(broadcast::error::SendError(_)) => {
}
}
}
pub async fn notify_event(pool: &PgPool) {
if let Err(e) = sqlx::query("SELECT pg_notify($1, '')")
.bind(PG_NOTIFY_CHANNEL)
.execute(pool)
.await
{
tracing::error!("Failed to send NOTIFY: {}", e);
}
}
pub async fn parse_cursor<'a>(
&self,
cursor: &str,
executor: &mut UnifiedExecutor<'a>,
) -> Result<EventCursor, sqlx::Error> {
EventRepository::parse_cursor(cursor, executor).await
}
pub async fn get_by_cursor<'a>(
&self,
cursor: Option<EventCursor>,
limit: Option<u16>,
executor: &mut UnifiedExecutor<'a>,
) -> Result<Vec<EventEntity>, sqlx::Error> {
EventRepository::get_by_cursor(cursor, limit, executor).await
}
pub async fn get_by_user_cursors<'a>(
&self,
user_cursors: Vec<(i32, Option<EventCursor>)>,
reverse: bool,
path_prefix: Option<&str>,
executor: &mut UnifiedExecutor<'a>,
) -> Result<Vec<EventEntity>, sqlx::Error> {
EventRepository::get_by_user_cursors(user_cursors, reverse, path_prefix, executor).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::persistence::sql::{user::UserRepository, SqlDb};
use crate::shared::webdav::WebDavPath;
use pubky_common::crypto::Keypair;
#[tokio::test]
#[pubky_test_utils::test]
async fn test_events_service_create_and_broadcast() {
let db = SqlDb::test().await;
let events_service = EventsService::new(100);
let user_pubkey = Keypair::random().public_key();
let user = UserRepository::create(&user_pubkey, &mut db.pool().into())
.await
.unwrap();
let path = EntryPath::new(user_pubkey.clone(), WebDavPath::new("/test.txt").unwrap());
let mut rx = events_service.subscribe();
let mut tx = db.pool().begin().await.unwrap();
let event = events_service
.create_event(
user.id,
EventType::Put {
content_hash: pubky_common::crypto::Hash::from_bytes([0; 32]),
},
&path,
&mut (&mut tx).into(),
)
.await
.unwrap();
tx.commit().await.unwrap();
events_service.broadcast_event(event.clone());
let received = rx.recv().await.unwrap();
assert_eq!(received.id, event.id);
assert_eq!(received.user_id, user.id);
assert!(matches!(received.event_type, EventType::Put { .. }));
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_events_service_get_by_cursor() {
let db = SqlDb::test().await;
let events_service = EventsService::new(100);
let user_pubkey = Keypair::random().public_key();
let user = UserRepository::create(&user_pubkey, &mut db.pool().into())
.await
.unwrap();
for i in 0..5 {
let path = EntryPath::new(
user_pubkey.clone(),
WebDavPath::new(&format!("/test{}.txt", i)).unwrap(),
);
events_service
.create_event(
user.id,
EventType::Put {
content_hash: pubky_common::crypto::Hash::from_bytes([0; 32]),
},
&path,
&mut db.pool().into(),
)
.await
.unwrap();
}
let events = events_service
.get_by_cursor(Some(EventCursor::new(2)), Some(3), &mut db.pool().into())
.await
.unwrap();
assert_eq!(events.len(), 3);
assert_eq!(events[0].id, 3);
assert_eq!(events[1].id, 4);
assert_eq!(events[2].id, 5);
}
}