#![cfg(feature = "grpc")]
use chrono::Utc;
use serde_json::json;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use this::core::events::EventBus;
use this::core::{EntityCreator, EntityFetcher};
use this::events::sinks::in_app::{NotificationStore, StoredNotification};
use this::server::entity_registry::{EntityDescriptor, EntityRegistry};
use this::server::exposure::grpc::GrpcExposure;
use this::server::host::ServerHost;
use this::storage::InMemoryLinkService;
use tokio::net::TcpListener;
use uuid::Uuid;
#[derive(Clone)]
struct TestEntityStore {
entity_type: String,
entities: Arc<tokio::sync::RwLock<HashMap<Uuid, serde_json::Value>>>,
}
impl TestEntityStore {
fn new(entity_type: &str) -> Self {
Self {
entity_type: entity_type.to_string(),
entities: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
}
}
#[async_trait::async_trait]
impl EntityFetcher for TestEntityStore {
async fn fetch_as_json(&self, entity_id: &Uuid) -> anyhow::Result<serde_json::Value> {
let entities = self.entities.read().await;
entities
.get(entity_id)
.cloned()
.ok_or_else(|| anyhow::anyhow!("{} not found: {}", self.entity_type, entity_id))
}
async fn list_as_json(
&self,
limit: Option<i32>,
offset: Option<i32>,
) -> anyhow::Result<Vec<serde_json::Value>> {
let entities = self.entities.read().await;
let offset = offset.unwrap_or(0) as usize;
let limit = limit.unwrap_or(50) as usize;
Ok(entities
.values()
.skip(offset)
.take(limit)
.cloned()
.collect())
}
}
#[async_trait::async_trait]
impl EntityCreator for TestEntityStore {
async fn create_from_json(
&self,
entity_data: serde_json::Value,
) -> anyhow::Result<serde_json::Value> {
let id = Uuid::new_v4();
let now = chrono::Utc::now().to_rfc3339();
let mut data = entity_data.as_object().cloned().unwrap_or_default();
data.insert("id".to_string(), json!(id.to_string()));
data.insert("type".to_string(), json!(self.entity_type));
data.insert("created_at".to_string(), json!(now));
let value = serde_json::Value::Object(data);
self.entities.write().await.insert(id, value.clone());
Ok(value)
}
async fn update_from_json(
&self,
entity_id: &Uuid,
entity_data: serde_json::Value,
) -> anyhow::Result<serde_json::Value> {
let mut entities = self.entities.write().await;
let existing = entities
.get_mut(entity_id)
.ok_or_else(|| anyhow::anyhow!("not found: {}", entity_id))?;
if let (Some(existing_obj), Some(update_obj)) =
(existing.as_object_mut(), entity_data.as_object())
{
for (key, value) in update_obj {
existing_obj.insert(key.clone(), value.clone());
}
}
Ok(existing.clone())
}
async fn delete(&self, entity_id: &Uuid) -> anyhow::Result<()> {
self.entities
.write()
.await
.remove(entity_id)
.ok_or_else(|| anyhow::anyhow!("not found: {}", entity_id))?;
Ok(())
}
}
struct TestEntityDescriptor {
entity_type: String,
plural: String,
}
impl TestEntityDescriptor {
fn new(entity_type: &str, plural: &str) -> Self {
Self {
entity_type: entity_type.to_string(),
plural: plural.to_string(),
}
}
}
impl EntityDescriptor for TestEntityDescriptor {
fn entity_type(&self) -> &str {
&self.entity_type
}
fn plural(&self) -> &str {
&self.plural
}
fn build_routes(&self) -> axum::Router {
axum::Router::new()
}
}
fn build_test_host() -> (Arc<ServerHost>, Arc<NotificationStore>) {
use this::config::LinksConfig;
let order_store = TestEntityStore::new("order");
let mut fetchers: HashMap<String, Arc<dyn EntityFetcher>> = HashMap::new();
fetchers.insert("order".to_string(), Arc::new(order_store.clone()));
let mut creators: HashMap<String, Arc<dyn EntityCreator>> = HashMap::new();
creators.insert("order".to_string(), Arc::new(order_store));
let mut registry = EntityRegistry::new();
registry.register(Box::new(TestEntityDescriptor::new("order", "orders")));
let notification_store = Arc::new(NotificationStore::new());
let host = ServerHost::from_builder_components(
Arc::new(InMemoryLinkService::new()),
LinksConfig::default_config(),
registry,
fetchers,
creators,
)
.unwrap()
.with_event_bus(EventBus::new(256))
.with_notification_store(notification_store.clone());
(Arc::new(host), notification_store)
}
async fn start_server() -> (SocketAddr, Arc<ServerHost>, Arc<NotificationStore>) {
let (host, store) = build_test_host();
let grpc_router = GrpcExposure::build_router(host.clone()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, grpc_router).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
(addr, host, store)
}
async fn notification_client(
addr: SocketAddr,
) -> this::server::exposure::grpc::proto::notification_service_client::NotificationServiceClient<
tonic::transport::Channel,
> {
use this::server::exposure::grpc::proto::notification_service_client::NotificationServiceClient;
let url = format!("http://{}", addr);
NotificationServiceClient::connect(url).await.unwrap()
}
async fn event_client(
addr: SocketAddr,
) -> this::server::exposure::grpc::proto::event_service_client::EventServiceClient<
tonic::transport::Channel,
> {
use this::server::exposure::grpc::proto::event_service_client::EventServiceClient;
let url = format!("http://{}", addr);
EventServiceClient::connect(url).await.unwrap()
}
async fn entity_client(
addr: SocketAddr,
) -> this::server::exposure::grpc::proto::entity_service_client::EntityServiceClient<
tonic::transport::Channel,
> {
use this::server::exposure::grpc::proto::entity_service_client::EntityServiceClient;
let url = format!("http://{}", addr);
EntityServiceClient::connect(url).await.unwrap()
}
async fn insert_test_notification(store: &NotificationStore, user_id: &str, title: &str) -> Uuid {
let id = Uuid::new_v4();
store
.insert(StoredNotification {
id,
recipient_id: user_id.to_string(),
notification_type: "test".to_string(),
title: title.to_string(),
body: format!("Body for {}", title),
data: json!({"source": "e2e_test"}),
read: false,
created_at: Utc::now(),
})
.await;
id
}
#[tokio::test]
async fn test_e2e_grpc_notification_crud_flow() {
use this::server::exposure::grpc::proto::*;
let (addr, _host, store) = start_server().await;
let mut client = notification_client(addr).await;
let id1 = insert_test_notification(&store, "user-A", "Notif 1").await;
let id2 = insert_test_notification(&store, "user-A", "Notif 2").await;
let _id3 = insert_test_notification(&store, "user-A", "Notif 3").await;
let list_resp = client
.list_notifications(ListNotificationsRequest {
user_id: "user-A".to_string(),
limit: 10,
offset: 0,
})
.await
.unwrap()
.into_inner();
assert_eq!(list_resp.notifications.len(), 3);
assert_eq!(list_resp.total, 3);
assert_eq!(list_resp.unread, 3);
assert_eq!(list_resp.notifications[0].title, "Notif 3");
let unread_resp = client
.get_unread_count(GetUnreadCountRequest {
user_id: "user-A".to_string(),
})
.await
.unwrap()
.into_inner();
assert_eq!(unread_resp.count, 3);
let mark_resp = client
.mark_as_read(MarkAsReadRequest {
notification_ids: vec![id1.to_string(), id2.to_string()],
user_id: Some("user-A".to_string()),
})
.await
.unwrap()
.into_inner();
assert_eq!(mark_resp.marked_count, 2);
let unread_resp = client
.get_unread_count(GetUnreadCountRequest {
user_id: "user-A".to_string(),
})
.await
.unwrap()
.into_inner();
assert_eq!(unread_resp.count, 1);
let mark_all_resp = client
.mark_all_as_read(MarkAllAsReadRequest {
user_id: "user-A".to_string(),
})
.await
.unwrap()
.into_inner();
assert_eq!(mark_all_resp.marked_count, 1);
let unread_resp = client
.get_unread_count(GetUnreadCountRequest {
user_id: "user-A".to_string(),
})
.await
.unwrap()
.into_inner();
assert_eq!(unread_resp.count, 0);
let del_resp = client
.delete_notification(DeleteNotificationRequest {
notification_id: id1.to_string(),
})
.await
.unwrap()
.into_inner();
assert!(del_resp.success);
let list_resp = client
.list_notifications(ListNotificationsRequest {
user_id: "user-A".to_string(),
limit: 10,
offset: 0,
})
.await
.unwrap()
.into_inner();
assert_eq!(list_resp.total, 2);
}
#[tokio::test]
async fn test_e2e_grpc_notification_pagination() {
use this::server::exposure::grpc::proto::*;
let (addr, _host, store) = start_server().await;
let mut client = notification_client(addr).await;
for i in 0..10 {
insert_test_notification(&store, "user-A", &format!("Notif {}", i)).await;
}
let page1 = client
.list_notifications(ListNotificationsRequest {
user_id: "user-A".to_string(),
limit: 3,
offset: 0,
})
.await
.unwrap()
.into_inner();
assert_eq!(page1.notifications.len(), 3);
assert_eq!(page1.total, 10);
let page2 = client
.list_notifications(ListNotificationsRequest {
user_id: "user-A".to_string(),
limit: 3,
offset: 3,
})
.await
.unwrap()
.into_inner();
assert_eq!(page2.notifications.len(), 3);
let p1_ids: Vec<_> = page1.notifications.iter().map(|n| &n.id).collect();
let p2_ids: Vec<_> = page2.notifications.iter().map(|n| &n.id).collect();
for id in &p2_ids {
assert!(!p1_ids.contains(id), "pages should not overlap");
}
}
#[tokio::test]
async fn test_e2e_grpc_notification_streaming() {
use this::server::exposure::grpc::proto::*;
use tokio_stream::StreamExt;
let (addr, _host, store) = start_server().await;
let mut client = notification_client(addr).await;
let response = client
.subscribe_notifications(SubscribeNotificationsRequest {
user_id: Some("user-A".to_string()),
})
.await
.unwrap();
let mut stream = response.into_inner();
let notif_id = insert_test_notification(&store, "user-A", "Streamed notification").await;
let msg = tokio::time::timeout(Duration::from_millis(200), stream.next())
.await
.expect("timed out waiting for notification")
.expect("stream ended")
.expect("error");
assert_eq!(msg.id, notif_id.to_string());
assert_eq!(msg.recipient_id, "user-A");
assert_eq!(msg.title, "Streamed notification");
assert!(!msg.read);
insert_test_notification(&store, "user-B", "Not for A").await;
let timeout_result = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
assert!(
timeout_result.is_err(),
"should not receive user-B notification on user-A stream"
);
}
#[tokio::test]
async fn test_e2e_grpc_event_stream_on_entity_create() {
use this::server::exposure::grpc::proto::*;
use tokio_stream::StreamExt;
let (addr, host, _store) = start_server().await;
let mut event_client = event_client(addr).await;
let response = event_client
.subscribe(SubscribeRequest {
entity_type: Some("order".to_string()),
entity_id: None,
event_type: Some("created".to_string()),
kind: Some("entity".to_string()),
link_type: None,
})
.await
.unwrap();
let mut stream = response.into_inner();
let entity_id = Uuid::new_v4();
host.event_bus()
.unwrap()
.publish(this::core::events::FrameworkEvent::Entity(
this::core::events::EntityEvent::Created {
entity_type: "order".to_string(),
entity_id,
data: json!({"amount": 99.99}),
},
));
let msg = tokio::time::timeout(Duration::from_millis(200), stream.next())
.await
.expect("timed out")
.expect("stream ended")
.expect("error");
assert_eq!(msg.event_kind, "entity");
assert_eq!(msg.event_type, "created");
assert_eq!(msg.entity_type, "order");
assert_eq!(msg.entity_id, entity_id.to_string());
}
#[tokio::test]
async fn test_e2e_grpc_entity_create_triggers_event() {
use this::server::exposure::grpc::proto::*;
use tokio_stream::StreamExt;
let (addr, host, _store) = start_server().await;
let mut evt_client = event_client(addr).await;
let response = evt_client
.subscribe(SubscribeRequest {
entity_type: None,
entity_id: None,
event_type: None,
kind: None,
link_type: None,
})
.await
.unwrap();
let mut event_stream = response.into_inner();
let mut ent_client = entity_client(addr).await;
let data = {
use prost_types::value::Kind;
let mut fields = std::collections::BTreeMap::new();
fields.insert(
"name".to_string(),
prost_types::Value {
kind: Some(Kind::StringValue("Test Order".to_string())),
},
);
prost_types::Struct { fields }
};
let create_resp = ent_client
.create_entity(CreateEntityRequest {
entity_type: "order".to_string(),
data: Some(data),
})
.await
.unwrap()
.into_inner();
assert!(create_resp.data.is_some());
host.event_bus()
.unwrap()
.publish(this::core::events::FrameworkEvent::Entity(
this::core::events::EntityEvent::Created {
entity_type: "order".to_string(),
entity_id: Uuid::new_v4(),
data: json!({"name": "Test Order"}),
},
));
let msg = tokio::time::timeout(Duration::from_millis(200), event_stream.next())
.await
.expect("timed out")
.expect("stream ended")
.expect("error");
assert_eq!(msg.event_kind, "entity");
assert_eq!(msg.event_type, "created");
assert_eq!(msg.entity_type, "order");
}
#[tokio::test]
async fn test_e2e_notification_visible_from_list_and_stream() {
use this::server::exposure::grpc::proto::*;
use tokio_stream::StreamExt;
let (addr, _host, store) = start_server().await;
let mut client = notification_client(addr).await;
let mut stream_client = notification_client(addr).await;
let response = stream_client
.subscribe_notifications(SubscribeNotificationsRequest {
user_id: Some("user-X".to_string()),
})
.await
.unwrap();
let mut stream = response.into_inner();
let notif_id = insert_test_notification(&store, "user-X", "Cross-check").await;
let streamed = tokio::time::timeout(Duration::from_millis(200), stream.next())
.await
.expect("timed out")
.expect("stream ended")
.expect("error");
assert_eq!(streamed.id, notif_id.to_string());
assert_eq!(streamed.title, "Cross-check");
let list_resp = client
.list_notifications(ListNotificationsRequest {
user_id: "user-X".to_string(),
limit: 10,
offset: 0,
})
.await
.unwrap()
.into_inner();
assert_eq!(list_resp.notifications.len(), 1);
assert_eq!(list_resp.notifications[0].id, notif_id.to_string());
assert_eq!(list_resp.notifications[0].title, "Cross-check");
}
#[tokio::test]
async fn test_e2e_notification_user_isolation() {
use this::server::exposure::grpc::proto::*;
let (addr, _host, store) = start_server().await;
let mut client = notification_client(addr).await;
insert_test_notification(&store, "alice", "For Alice 1").await;
insert_test_notification(&store, "alice", "For Alice 2").await;
insert_test_notification(&store, "bob", "For Bob").await;
let alice = client
.list_notifications(ListNotificationsRequest {
user_id: "alice".to_string(),
limit: 10,
offset: 0,
})
.await
.unwrap()
.into_inner();
assert_eq!(alice.total, 2);
assert_eq!(alice.unread, 2);
let bob = client
.list_notifications(ListNotificationsRequest {
user_id: "bob".to_string(),
limit: 10,
offset: 0,
})
.await
.unwrap()
.into_inner();
assert_eq!(bob.total, 1);
assert_eq!(bob.unread, 1);
client
.mark_all_as_read(MarkAllAsReadRequest {
user_id: "alice".to_string(),
})
.await
.unwrap();
let bob_after = client
.get_unread_count(GetUnreadCountRequest {
user_id: "bob".to_string(),
})
.await
.unwrap()
.into_inner();
assert_eq!(bob_after.count, 1, "Bob's count should be unaffected");
}
#[tokio::test]
async fn test_e2e_event_and_notification_streams_coexist() {
use this::server::exposure::grpc::proto::*;
use tokio_stream::StreamExt;
let (addr, host, store) = start_server().await;
let mut evt_client = event_client(addr).await;
let evt_response = evt_client
.subscribe(SubscribeRequest {
entity_type: None,
entity_id: None,
event_type: None,
kind: None,
link_type: None,
})
.await
.unwrap();
let mut event_stream = evt_response.into_inner();
let mut notif_client = notification_client(addr).await;
let notif_response = notif_client
.subscribe_notifications(SubscribeNotificationsRequest {
user_id: None, })
.await
.unwrap();
let mut notif_stream = notif_response.into_inner();
host.event_bus()
.unwrap()
.publish(this::core::events::FrameworkEvent::Entity(
this::core::events::EntityEvent::Created {
entity_type: "order".to_string(),
entity_id: Uuid::new_v4(),
data: json!({"status": "new"}),
},
));
insert_test_notification(&store, "user-A", "Coexist test").await;
let event_msg = tokio::time::timeout(Duration::from_millis(200), event_stream.next())
.await
.expect("timed out waiting for event")
.expect("event stream ended")
.expect("event error");
assert_eq!(event_msg.event_kind, "entity");
assert_eq!(event_msg.event_type, "created");
let notif_msg = tokio::time::timeout(Duration::from_millis(200), notif_stream.next())
.await
.expect("timed out waiting for notification")
.expect("notification stream ended")
.expect("notification error");
assert_eq!(notif_msg.title, "Coexist test");
assert_eq!(notif_msg.recipient_id, "user-A");
}