use chrono::{DateTime, TimeZone, Utc};
use rusqlite::{params, Connection, Result as SqlResult};
use serde::{Deserialize, Serialize};
fn datetime_to_f64(dt: &DateTime<Utc>) -> f64 {
dt.timestamp() as f64 + dt.timestamp_subsec_nanos() as f64 / 1_000_000_000.0
}
fn f64_to_datetime(ts: f64) -> DateTime<Utc> {
let secs = ts.floor() as i64;
let nanos = ((ts - secs as f64) * 1_000_000_000.0).max(0.0) as u32;
Utc.timestamp_opt(secs, nanos)
.single()
.unwrap_or_else(Utc::now)
}
fn now_f64() -> f64 {
datetime_to_f64(&Utc::now())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notification {
pub memory_id: String,
pub namespace: String,
pub content: String,
pub importance: f64,
pub created_at: DateTime<Utc>,
pub subscription_namespace: String,
pub threshold: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Subscription {
pub subscriber_id: String,
pub namespace: String,
pub min_importance: f64,
pub created_at: DateTime<Utc>,
}
pub struct SubscriptionManager<'a> {
conn: &'a Connection,
}
impl<'a> SubscriptionManager<'a> {
pub fn new(conn: &'a Connection) -> Result<Self, Box<dyn std::error::Error>> {
Self::init_tables(conn)?;
Ok(Self { conn })
}
fn init_tables(conn: &Connection) -> SqlResult<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS subscriptions (
subscriber_id TEXT NOT NULL,
namespace TEXT NOT NULL,
min_importance REAL NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (subscriber_id, namespace)
);
CREATE TABLE IF NOT EXISTS notification_cursor (
agent_id TEXT PRIMARY KEY,
last_checked REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_subscriptions_ns ON subscriptions(namespace);
"#,
)?;
Ok(())
}
pub fn subscribe(
&self,
agent_id: &str,
namespace: &str,
min_importance: f64,
) -> Result<(), Box<dyn std::error::Error>> {
let clamped = min_importance.clamp(0.0, 1.0);
self.conn.execute(
r#"
INSERT OR REPLACE INTO subscriptions (subscriber_id, namespace, min_importance, created_at)
VALUES (?, ?, ?, ?)
"#,
params![
agent_id,
namespace,
clamped,
now_f64(),
],
)?;
Ok(())
}
pub fn unsubscribe(
&self,
agent_id: &str,
namespace: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
let affected = self.conn.execute(
"DELETE FROM subscriptions WHERE subscriber_id = ? AND namespace = ?",
params![agent_id, namespace],
)?;
Ok(affected > 0)
}
pub fn list_subscriptions(
&self,
agent_id: &str,
) -> Result<Vec<Subscription>, Box<dyn std::error::Error>> {
let mut stmt = self.conn.prepare(
"SELECT subscriber_id, namespace, min_importance, created_at FROM subscriptions WHERE subscriber_id = ?"
)?;
let rows = stmt.query_map(params![agent_id], |row| {
let created_at_f64: f64 = row.get(3)?;
Ok(Subscription {
subscriber_id: row.get(0)?,
namespace: row.get(1)?,
min_importance: row.get(2)?,
created_at: f64_to_datetime(created_at_f64),
})
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
fn query_notifications_for_sub(
&self,
sub: &Subscription,
since: Option<&DateTime<Utc>>,
) -> Result<Vec<Notification>, Box<dyn std::error::Error>> {
let mut notifications = Vec::new();
if sub.namespace == "*" {
if let Some(since_dt) = since {
let mut stmt = self.conn.prepare(
"SELECT id, namespace, content, importance, created_at FROM memories
WHERE created_at > ? AND importance >= ?"
)?;
let rows = stmt.query_map(params![datetime_to_f64(since_dt), sub.min_importance], |row| {
let created_at_f64: f64 = row.get(4)?;
Ok(Notification {
memory_id: row.get(0)?,
namespace: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
created_at: f64_to_datetime(created_at_f64),
subscription_namespace: sub.namespace.clone(),
threshold: sub.min_importance,
})
})?;
for notif in rows.flatten() {
notifications.push(notif);
}
} else {
let mut stmt = self.conn.prepare(
"SELECT id, namespace, content, importance, created_at FROM memories
WHERE importance >= ?"
)?;
let rows = stmt.query_map(params![sub.min_importance], |row| {
let created_at_f64: f64 = row.get(4)?;
Ok(Notification {
memory_id: row.get(0)?,
namespace: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
created_at: f64_to_datetime(created_at_f64),
subscription_namespace: sub.namespace.clone(),
threshold: sub.min_importance,
})
})?;
for notif in rows.flatten() {
notifications.push(notif);
}
}
} else {
if let Some(since_dt) = since {
let mut stmt = self.conn.prepare(
"SELECT id, namespace, content, importance, created_at FROM memories
WHERE created_at > ? AND importance >= ? AND namespace = ?"
)?;
let rows = stmt.query_map(
params![datetime_to_f64(since_dt), sub.min_importance, &sub.namespace],
|row| {
let created_at_f64: f64 = row.get(4)?;
Ok(Notification {
memory_id: row.get(0)?,
namespace: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
created_at: f64_to_datetime(created_at_f64),
subscription_namespace: sub.namespace.clone(),
threshold: sub.min_importance,
})
}
)?;
for notif in rows.flatten() {
notifications.push(notif);
}
} else {
let mut stmt = self.conn.prepare(
"SELECT id, namespace, content, importance, created_at FROM memories
WHERE importance >= ? AND namespace = ?"
)?;
let rows = stmt.query_map(
params![sub.min_importance, &sub.namespace],
|row| {
let created_at_f64: f64 = row.get(4)?;
Ok(Notification {
memory_id: row.get(0)?,
namespace: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
created_at: f64_to_datetime(created_at_f64),
subscription_namespace: sub.namespace.clone(),
threshold: sub.min_importance,
})
}
)?;
for notif in rows.flatten() {
notifications.push(notif);
}
}
}
Ok(notifications)
}
pub fn check_notifications(
&self,
agent_id: &str,
) -> Result<Vec<Notification>, Box<dyn std::error::Error>> {
let last_checked: Option<f64> = self.conn
.query_row(
"SELECT last_checked FROM notification_cursor WHERE agent_id = ?",
params![agent_id],
|row| row.get(0),
)
.ok();
let last_checked_dt = last_checked.map(f64_to_datetime);
let subscriptions = self.list_subscriptions(agent_id)?;
if subscriptions.is_empty() {
return Ok(vec![]);
}
let mut notifications = Vec::new();
for sub in &subscriptions {
let sub_notifs = self.query_notifications_for_sub(sub, last_checked_dt.as_ref())?;
notifications.extend(sub_notifs);
}
self.conn.execute(
"INSERT OR REPLACE INTO notification_cursor (agent_id, last_checked) VALUES (?, ?)",
params![agent_id, now_f64()],
)?;
notifications.sort_by(|a, b| a.memory_id.cmp(&b.memory_id));
notifications.dedup_by(|a, b| a.memory_id == b.memory_id);
notifications.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(notifications)
}
pub fn peek_notifications(
&self,
agent_id: &str,
) -> Result<Vec<Notification>, Box<dyn std::error::Error>> {
let last_checked: Option<f64> = self.conn
.query_row(
"SELECT last_checked FROM notification_cursor WHERE agent_id = ?",
params![agent_id],
|row| row.get(0),
)
.ok();
let last_checked_dt = last_checked.map(f64_to_datetime);
let subscriptions = self.list_subscriptions(agent_id)?;
if subscriptions.is_empty() {
return Ok(vec![]);
}
let mut notifications = Vec::new();
for sub in &subscriptions {
let sub_notifs = self.query_notifications_for_sub(sub, last_checked_dt.as_ref())?;
notifications.extend(sub_notifs);
}
notifications.sort_by(|a, b| a.memory_id.cmp(&b.memory_id));
notifications.dedup_by(|a, b| a.memory_id == b.memory_id);
notifications.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(notifications)
}
pub fn reset_cursor(&self, agent_id: &str) -> Result<(), Box<dyn std::error::Error>> {
self.conn.execute(
"DELETE FROM notification_cursor WHERE agent_id = ?",
params![agent_id],
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn setup_test_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL,
layer TEXT NOT NULL,
created_at REAL NOT NULL,
working_strength REAL NOT NULL DEFAULT 1.0,
core_strength REAL NOT NULL DEFAULT 0.0,
importance REAL NOT NULL DEFAULT 0.3,
pinned INTEGER NOT NULL DEFAULT 0,
consolidation_count INTEGER NOT NULL DEFAULT 0,
last_consolidated REAL,
source TEXT DEFAULT '',
contradicts TEXT DEFAULT '',
contradicted_by TEXT DEFAULT '',
metadata TEXT,
namespace TEXT NOT NULL DEFAULT 'default'
);
"#,
).unwrap();
conn
}
#[test]
fn test_subscribe_unsubscribe() {
let conn = setup_test_db();
let mgr = SubscriptionManager::new(&conn).unwrap();
mgr.subscribe("ceo", "trading", 0.8).unwrap();
let subs = mgr.list_subscriptions("ceo").unwrap();
assert_eq!(subs.len(), 1);
assert_eq!(subs[0].namespace, "trading");
assert!((subs[0].min_importance - 0.8).abs() < 0.01);
let removed = mgr.unsubscribe("ceo", "trading").unwrap();
assert!(removed);
let subs = mgr.list_subscriptions("ceo").unwrap();
assert!(subs.is_empty());
}
#[test]
fn test_subscribe_wildcard() {
let conn = setup_test_db();
let mgr = SubscriptionManager::new(&conn).unwrap();
mgr.subscribe("ceo", "*", 0.9).unwrap();
let subs = mgr.list_subscriptions("ceo").unwrap();
assert_eq!(subs.len(), 1);
assert_eq!(subs[0].namespace, "*");
}
#[test]
fn test_notifications_basic() {
let conn = setup_test_db();
let mgr = SubscriptionManager::new(&conn).unwrap();
mgr.subscribe("ceo", "trading", 0.7).unwrap();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, importance, namespace)
VALUES ('m1', 'Oil price spike', 'factual', 'working', strftime('%s','now'), 0.9, 'trading')",
[],
).unwrap();
let notifs = mgr.check_notifications("ceo").unwrap();
assert_eq!(notifs.len(), 1);
assert_eq!(notifs[0].memory_id, "m1");
assert_eq!(notifs[0].namespace, "trading");
let notifs = mgr.check_notifications("ceo").unwrap();
assert!(notifs.is_empty());
}
#[test]
fn test_notifications_threshold() {
let conn = setup_test_db();
let mgr = SubscriptionManager::new(&conn).unwrap();
mgr.subscribe("ceo", "trading", 0.8).unwrap();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, importance, namespace)
VALUES ('m1', 'Minor update', 'factual', 'working', strftime('%s','now'), 0.3, 'trading')",
[],
).unwrap();
let notifs = mgr.check_notifications("ceo").unwrap();
assert!(notifs.is_empty());
}
#[test]
fn test_notifications_wildcard() {
let conn = setup_test_db();
let mgr = SubscriptionManager::new(&conn).unwrap();
mgr.subscribe("ceo", "*", 0.8).unwrap();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, importance, namespace)
VALUES ('m1', 'Trading alert', 'factual', 'working', strftime('%s','now'), 0.9, 'trading')",
[],
).unwrap();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, importance, namespace)
VALUES ('m2', 'Engine alert', 'factual', 'working', strftime('%s','now'), 0.85, 'engine')",
[],
).unwrap();
let notifs = mgr.check_notifications("ceo").unwrap();
assert_eq!(notifs.len(), 2);
}
#[test]
fn test_peek_notifications() {
let conn = setup_test_db();
let mgr = SubscriptionManager::new(&conn).unwrap();
mgr.subscribe("ceo", "trading", 0.7).unwrap();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, importance, namespace)
VALUES ('m1', 'Test', 'factual', 'working', strftime('%s','now'), 0.9, 'trading')",
[],
).unwrap();
let notifs = mgr.peek_notifications("ceo").unwrap();
assert_eq!(notifs.len(), 1);
let notifs = mgr.peek_notifications("ceo").unwrap();
assert_eq!(notifs.len(), 1);
let notifs = mgr.check_notifications("ceo").unwrap();
assert_eq!(notifs.len(), 1);
let notifs = mgr.check_notifications("ceo").unwrap();
assert!(notifs.is_empty());
}
}