use std::time::Duration;
use peisear_core::notifications::{Severity, channel as channel_id, kind as kind_id};
use peisear_notify::{DispatchEvent, DispatchTx, dispatch_loop};
use peisear_notify::config::{SmtpConfig, TlsMode};
use peisear_notify::dispatch::DispatchContext;
use peisear_storage::{Pool, notifications as notif_store, pool, users};
use tokio::sync::mpsc;
async fn fresh_pool() -> Pool {
let suffix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let path = format!("/tmp/peisear-notify-it-{}", suffix);
std::fs::create_dir_all(&path).unwrap();
let url = format!("sqlite://{}/test.db", path);
let p = pool::connect(&url).await.unwrap();
pool::migrate(&p).await.unwrap();
p
}
async fn create_user_subscribed_to_email(pool: &Pool) -> String {
let user_id = format!(
"user-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
users::insert(
pool,
&user_id,
&format!("{}@example.com", user_id),
"x", "TestUser",
)
.await
.unwrap();
notif_store::upsert_preference(
pool,
&user_id,
kind_id::BURNOUT_OVERLOAD,
&[channel_id::IN_APP, channel_id::EMAIL],
Severity::Info,
)
.await
.unwrap();
user_id
}
fn spawn_dispatch(ctx: DispatchContext) -> DispatchTx {
let (tx, rx) = mpsc::channel::<DispatchEvent>(8);
tokio::spawn(dispatch_loop(ctx, rx));
tx
}
fn make_event(user_id: &str) -> DispatchEvent {
DispatchEvent {
user_id: user_id.to_string(),
kind: kind_id::BURNOUT_OVERLOAD.to_string(),
severity: Severity::Watch,
title: "Sustained over-capacity streak".to_string(),
body: "Test body for integration test.".to_string(),
payload_json: None,
}
}
async fn wait_for_notification(
pool: &Pool,
user_id: &str,
) -> peisear_core::notifications::Notification {
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let rows = notif_store::recent_for_user(pool, user_id, 10)
.await
.unwrap();
if let Some(row) = rows.into_iter().next() {
return row;
}
if std::time::Instant::now() > deadline {
panic!("notification didn't arrive within 10s");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn smtp_unconfigured_records_in_app_only_in_dispatched_via() {
let pool = fresh_pool().await;
let user_id = create_user_subscribed_to_email(&pool).await;
let ctx = DispatchContext {
db: pool.clone(),
smtp: None, };
let tx = spawn_dispatch(ctx);
tx.send(make_event(&user_id)).await.unwrap();
let row = wait_for_notification(&pool, &user_id).await;
assert_eq!(
row.dispatched_via,
vec!["in_app".to_string()],
"expected only in_app delivery; got: {:?}",
row.dispatched_via
);
assert_eq!(row.kind, kind_id::BURNOUT_OVERLOAD);
}
#[tokio::test]
async fn smtp_configured_but_unreachable_records_in_app_only() {
let pool = fresh_pool().await;
let user_id = create_user_subscribed_to_email(&pool).await;
let unreachable = SmtpConfig {
host: "127.0.0.1".to_string(),
port: 1,
tls_mode: TlsMode::Implicit,
username: "test@example.com".to_string(),
password: "x".to_string(),
from_address: "test@example.com".to_string(),
from_name: None,
};
let ctx = DispatchContext {
db: pool.clone(),
smtp: Some(unreachable),
};
let tx = spawn_dispatch(ctx);
tx.send(make_event(&user_id)).await.unwrap();
let row = wait_for_notification(&pool, &user_id).await;
assert_eq!(
row.dispatched_via,
vec!["in_app".to_string()],
"expected only in_app delivery (email send fails); \
got: {:?}",
row.dispatched_via
);
}
#[tokio::test]
async fn cooldown_suppresses_second_dispatch_within_window() {
let pool = fresh_pool().await;
let user_id = create_user_subscribed_to_email(&pool).await;
let ctx = DispatchContext {
db: pool.clone(),
smtp: None,
};
let tx = spawn_dispatch(ctx);
tx.send(make_event(&user_id)).await.unwrap();
let _first = wait_for_notification(&pool, &user_id).await;
tx.send(make_event(&user_id)).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let rows = notif_store::recent_for_user(&pool, &user_id, 10)
.await
.unwrap();
assert_eq!(
rows.len(),
1,
"cooldown should suppress the 2nd notification within \
24h; got {} rows",
rows.len()
);
}