pub mod counter;
pub mod device_tokens;
pub mod in_app;
pub mod preferences;
pub mod push;
pub mod webhook;
pub mod websocket;
pub use counter::{CounterConfig, CounterOperation, CounterSink, EntityFieldUpdater};
pub use device_tokens::{DeviceToken, DeviceTokenStore, Platform};
pub use in_app::{InAppNotificationSink, NotificationStore};
pub use preferences::{NotificationPreferencesStore, UserPreferences};
#[cfg(feature = "push")]
pub use push::ExpoPushProvider;
pub use push::{PushNotificationSink, PushProvider};
pub use webhook::{HttpSender, WebhookConfig, WebhookSink};
pub use websocket::{WebSocketDispatcher, WebSocketSink};
use crate::config::sinks::SinkType;
use anyhow::Result;
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub fn resolve_recipient(
explicit: Option<&str>,
payload: &Value,
context_vars: &HashMap<String, Value>,
) -> Option<String> {
explicit
.map(|s| s.to_string())
.or_else(|| {
payload
.get("recipient_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
.or_else(|| {
context_vars
.get("recipient_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
}
#[async_trait]
pub trait Sink: Send + Sync + std::fmt::Debug {
async fn deliver(
&self,
payload: Value,
recipient_id: Option<&str>,
context_vars: &HashMap<String, Value>,
) -> Result<()>;
fn name(&self) -> &str;
fn sink_type(&self) -> SinkType;
}
#[derive(Debug)]
pub struct SinkRegistry {
sinks: RwLock<HashMap<String, Arc<dyn Sink>>>,
}
impl SinkRegistry {
pub fn new() -> Self {
Self {
sinks: RwLock::new(HashMap::new()),
}
}
pub fn register(&self, name: impl Into<String>, sink: Arc<dyn Sink>) {
self.sinks.write().unwrap().insert(name.into(), sink);
}
pub fn get(&self, name: &str) -> Option<Arc<dyn Sink>> {
self.sinks.read().unwrap().get(name).cloned()
}
pub fn names(&self) -> Vec<String> {
self.sinks.read().unwrap().keys().cloned().collect()
}
pub async fn deliver(
&self,
sink_name: &str,
payload: Value,
recipient_id: Option<&str>,
context_vars: &HashMap<String, Value>,
) -> Result<()> {
let sink = self
.get(sink_name)
.ok_or_else(|| anyhow::anyhow!("sink '{}' not found in registry", sink_name))?;
sink.deliver(payload, recipient_id, context_vars).await
}
pub fn len(&self) -> usize {
self.sinks.read().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.sinks.read().unwrap().is_empty()
}
}
impl Default for SinkRegistry {
fn default() -> Self {
Self::new()
}
}
pub struct SinkFactory {
notification_store: Arc<NotificationStore>,
preferences_store: Arc<NotificationPreferencesStore>,
device_token_store: Arc<DeviceTokenStore>,
}
impl SinkFactory {
pub fn new() -> Self {
Self {
notification_store: Arc::new(NotificationStore::new()),
preferences_store: Arc::new(NotificationPreferencesStore::new()),
device_token_store: Arc::new(DeviceTokenStore::new()),
}
}
pub fn with_stores(
notification_store: Arc<NotificationStore>,
preferences_store: Arc<NotificationPreferencesStore>,
device_token_store: Arc<DeviceTokenStore>,
) -> Self {
Self {
notification_store,
preferences_store,
device_token_store,
}
}
pub fn notification_store(&self) -> &Arc<NotificationStore> {
&self.notification_store
}
pub fn preferences_store(&self) -> &Arc<NotificationPreferencesStore> {
&self.preferences_store
}
pub fn device_token_store(&self) -> &Arc<DeviceTokenStore> {
&self.device_token_store
}
pub fn build_registry(
&self,
sink_configs: &[crate::config::sinks::SinkConfig],
) -> SinkRegistry {
let registry = SinkRegistry::new();
for config in sink_configs {
match config.sink_type {
SinkType::InApp => {
let sink = InAppNotificationSink::with_preferences(
self.notification_store.clone(),
self.preferences_store.clone(),
);
registry.register(&config.name, Arc::new(sink));
tracing::info!(
sink = %config.name,
"auto-wired InApp notification sink"
);
}
SinkType::Push => {
tracing::warn!(
sink = %config.name,
"Push sink requires a PushProvider — use ServerBuilder::with_push_provider() to wire it"
);
}
SinkType::WebSocket => {
tracing::warn!(
sink = %config.name,
"WebSocket sink will be wired automatically when WebSocketExposure is built"
);
}
SinkType::Webhook => {
tracing::warn!(
sink = %config.name,
"Webhook sink requires an HttpSender implementation — skipping auto-wire"
);
}
SinkType::Counter => {
tracing::warn!(
sink = %config.name,
"Counter sink requires an EntityFieldUpdater — use ServerBuilder::with_counter_updater() to wire it"
);
}
SinkType::Feed => {
tracing::warn!(
sink = %config.name,
"Feed sink is not yet implemented — skipping"
);
}
SinkType::Custom => {
tracing::warn!(
sink = %config.name,
"Custom sink requires manual registration — skipping auto-wire"
);
}
}
}
registry
}
}
impl Default for SinkFactory {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
type DeliveryLog = Vec<(Value, Option<String>)>;
#[derive(Debug)]
struct TestSink {
sink_name: String,
deliveries: Arc<tokio::sync::Mutex<DeliveryLog>>,
}
impl TestSink {
fn new(name: &str) -> Self {
Self {
sink_name: name.to_string(),
deliveries: Arc::new(tokio::sync::Mutex::new(Vec::new())),
}
}
}
#[async_trait]
impl Sink for TestSink {
async fn deliver(
&self,
payload: Value,
recipient_id: Option<&str>,
_context_vars: &HashMap<String, Value>,
) -> Result<()> {
self.deliveries
.lock()
.await
.push((payload, recipient_id.map(|s| s.to_string())));
Ok(())
}
fn name(&self) -> &str {
&self.sink_name
}
fn sink_type(&self) -> SinkType {
SinkType::Custom
}
}
#[test]
fn test_registry_register_and_get() {
let registry = SinkRegistry::new();
let sink = Arc::new(TestSink::new("test-sink"));
registry.register("test-sink", sink);
assert_eq!(registry.len(), 1);
assert!(registry.get("test-sink").is_some());
assert!(registry.get("nonexistent").is_none());
}
#[test]
fn test_registry_names() {
let registry = SinkRegistry::new();
registry.register("a", Arc::new(TestSink::new("a")));
registry.register("b", Arc::new(TestSink::new("b")));
let mut names = registry.names();
names.sort();
assert_eq!(names, vec!["a", "b"]);
}
#[tokio::test]
async fn test_registry_deliver() {
let registry = SinkRegistry::new();
let sink = Arc::new(TestSink::new("test-sink"));
let deliveries = sink.deliveries.clone();
registry.register("test-sink", sink);
let payload = json!({"title": "Hello", "body": "World"});
registry
.deliver(
"test-sink",
payload.clone(),
Some("user-1"),
&HashMap::new(),
)
.await
.unwrap();
let recorded = deliveries.lock().await;
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].0, payload);
assert_eq!(recorded[0].1.as_deref(), Some("user-1"));
}
#[tokio::test]
async fn test_registry_deliver_unknown_sink() {
let registry = SinkRegistry::new();
let result = registry
.deliver("nonexistent", json!({}), None, &HashMap::new())
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("nonexistent"));
}
#[test]
fn test_registry_replace_sink() {
let registry = SinkRegistry::new();
registry.register("s", Arc::new(TestSink::new("s-v1")));
registry.register("s", Arc::new(TestSink::new("s-v2")));
assert_eq!(registry.len(), 1);
assert_eq!(registry.get("s").unwrap().name(), "s-v2");
}
#[test]
fn test_registry_default_is_empty() {
let registry = SinkRegistry::default();
assert!(registry.is_empty());
assert_eq!(registry.len(), 0);
}
}