use anyhow::{Context, Result};
use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub const MAX_ENTRIES: u64 = 100_000;
const EVICTION_BATCH: u64 = 256;
pub const ACTIVITY_DB_FILENAME: &str = "activity.redb";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ActivitySource {
Http,
Mcp,
Hook,
}
impl ActivitySource {
pub fn as_str(&self) -> &'static str {
match self {
Self::Http => "http",
Self::Mcp => "mcp",
Self::Hook => "hook",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s.trim().to_ascii_lowercase().as_str() {
"http" => Some(Self::Http),
"mcp" => Some(Self::Mcp),
"hook" => Some(Self::Hook),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActivityEntry {
pub id: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub source: ActivitySource,
pub palace_id: Option<String>,
pub event_type: String,
pub payload: String,
}
const ACTIVITY_TABLE: TableDefinition<u64, Vec<u8>> = TableDefinition::new("activity");
#[derive(Debug, Default, Clone)]
pub struct ActivityFilter {
pub palace_id: Option<String>,
pub source: Option<ActivitySource>,
pub since: Option<chrono::DateTime<chrono::Utc>>,
pub until: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Clone)]
pub enum ActivityLog {
Redb {
db: Arc<Database>,
next_id: Arc<AtomicU64>,
},
Discard,
}
impl ActivityLog {
pub fn open(data_root: &Path) -> Result<Self> {
std::fs::create_dir_all(data_root)
.with_context(|| format!("create activity dir {}", data_root.display()))?;
let path = data_root.join(ACTIVITY_DB_FILENAME);
let db = Database::create(&path)
.with_context(|| format!("open activity db {}", path.display()))?;
let max_key = {
let write = db.begin_write().context("begin_write to init activity")?;
{
let _t = write
.open_table(ACTIVITY_TABLE)
.context("open_table activity")?;
}
write.commit().context("commit init activity")?;
let read = db
.begin_read()
.context("begin_read to seed activity next_id")?;
let table = read
.open_table(ACTIVITY_TABLE)
.context("open_table activity (read)")?;
let last = table.last().context("read last activity row")?;
let key = last.map(|(k, _)| k.value()).unwrap_or(0);
drop(table);
drop(read);
key
};
Ok(Self::Redb {
db: Arc::new(db),
next_id: Arc::new(AtomicU64::new(max_key.saturating_add(1))),
})
}
pub fn discard() -> Self {
Self::Discard
}
pub fn is_discard(&self) -> bool {
matches!(self, Self::Discard)
}
pub fn alloc_id(&self) -> u64 {
match self {
Self::Redb { next_id, .. } => next_id.fetch_add(1, Ordering::SeqCst),
Self::Discard => 0,
}
}
pub fn append_with_id(
&self,
id: u64,
source: ActivitySource,
palace_id: Option<String>,
event_type: impl Into<String>,
payload: impl Serialize,
) -> Result<u64> {
let db = match self {
Self::Redb { db, .. } => db,
Self::Discard => return Ok(0),
};
let payload_json = serde_json::to_string(&payload).context("serialize activity payload")?;
let entry = ActivityEntry {
id,
timestamp: chrono::Utc::now(),
source,
palace_id,
event_type: event_type.into(),
payload: payload_json,
};
let bytes = serde_json::to_vec(&entry).context("serialize activity entry")?;
let write = db.begin_write().context("begin_write activity")?;
{
let mut table = write
.open_table(ACTIVITY_TABLE)
.context("open_table activity (append)")?;
table.insert(&id, &bytes).context("insert activity entry")?;
}
write.commit().context("commit activity append")?;
self.prune()?;
Ok(id)
}
pub fn append(
&self,
source: ActivitySource,
palace_id: Option<String>,
event_type: impl Into<String>,
payload: impl Serialize,
) -> Result<u64> {
let id = self.alloc_id();
self.append_with_id(id, source, palace_id, event_type, payload)
}
pub fn prune(&self) -> Result<()> {
let db = match self {
Self::Redb { db, .. } => db,
Self::Discard => return Ok(()),
};
loop {
let count = self.count()?;
if count <= MAX_ENTRIES {
return Ok(());
}
let overflow = count - MAX_ENTRIES;
let to_drop = overflow.min(EVICTION_BATCH);
let write = db.begin_write().context("begin_write activity (prune)")?;
{
let mut table = write
.open_table(ACTIVITY_TABLE)
.context("open_table activity (prune)")?;
let oldest: Vec<u64> = table
.iter()
.context("iter activity for prune")?
.take(to_drop as usize)
.filter_map(|res| res.ok().map(|(k, _)| k.value()))
.collect();
for id in oldest {
let _ = table.remove(&id).context("remove activity entry")?;
}
}
write.commit().context("commit activity prune")?;
}
}
pub fn count(&self) -> Result<u64> {
let db = match self {
Self::Redb { db, .. } => db,
Self::Discard => return Ok(0),
};
let read = db.begin_read().context("begin_read activity count")?;
let table = read
.open_table(ACTIVITY_TABLE)
.context("open_table activity (count)")?;
table.len().context("table.len activity")
}
pub fn list(
&self,
filter: &ActivityFilter,
limit: usize,
offset: usize,
) -> Result<Vec<ActivityEntry>> {
let db = match self {
Self::Redb { db, .. } => db,
Self::Discard => return Ok(Vec::new()),
};
let read = db.begin_read().context("begin_read activity list")?;
let table = read
.open_table(ACTIVITY_TABLE)
.context("open_table activity (list)")?;
let mut out: Vec<ActivityEntry> = Vec::with_capacity(limit.min(256));
let mut skipped: usize = 0;
for res in table
.iter()
.context("iter activity (list)")?
.rev()
.flatten()
{
let (_, bytes) = res;
let entry: ActivityEntry = match serde_json::from_slice(bytes.value().as_slice()) {
Ok(e) => e,
Err(e) => {
tracing::warn!("activity entry deserialize failed: {e}");
continue;
}
};
if !entry_matches(&entry, filter) {
continue;
}
if skipped < offset {
skipped += 1;
continue;
}
out.push(entry);
if out.len() >= limit {
break;
}
}
Ok(out)
}
}
fn entry_matches(entry: &ActivityEntry, filter: &ActivityFilter) -> bool {
if let Some(p) = filter.palace_id.as_ref() {
match entry.palace_id.as_ref() {
Some(have) if have == p => {}
_ => return false,
}
}
if let Some(s) = filter.source {
if entry.source != s {
return false;
}
}
if let Some(t) = filter.since {
if entry.timestamp < t {
return false;
}
}
if let Some(t) = filter.until {
if entry.timestamp > t {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn fresh_log() -> (ActivityLog, tempfile::TempDir) {
let tmp = tempfile::tempdir().expect("tempdir");
let log = ActivityLog::open(tmp.path()).expect("open activity log");
(log, tmp)
}
#[test]
fn activity_source_parse_and_back() {
assert_eq!(ActivitySource::parse("http"), Some(ActivitySource::Http));
assert_eq!(ActivitySource::parse(" MCP "), Some(ActivitySource::Mcp));
assert_eq!(ActivitySource::parse("Hook"), Some(ActivitySource::Hook));
assert_eq!(ActivitySource::parse("nope"), None);
assert_eq!(ActivitySource::Http.as_str(), "http");
assert_eq!(ActivitySource::Mcp.as_str(), "mcp");
assert_eq!(ActivitySource::Hook.as_str(), "hook");
}
#[test]
fn activity_source_round_trips_via_serde() {
for src in [
ActivitySource::Http,
ActivitySource::Mcp,
ActivitySource::Hook,
] {
let s = serde_json::to_string(&src).unwrap();
let back: ActivitySource = serde_json::from_str(&s).unwrap();
assert_eq!(src, back);
}
assert_eq!(
serde_json::to_string(&ActivitySource::Mcp).unwrap(),
"\"mcp\""
);
}
#[test]
fn entry_serde_round_trip() {
let entry = ActivityEntry {
id: 7,
timestamp: chrono::Utc::now(),
source: ActivitySource::Mcp,
palace_id: Some("alpha".to_string()),
event_type: "drawer_added".to_string(),
payload: "{\"a\":1}".to_string(),
};
let bytes = serde_json::to_vec(&entry).unwrap();
let back: ActivityEntry = serde_json::from_slice(&bytes).unwrap();
assert_eq!(back.id, entry.id);
assert_eq!(back.source, entry.source);
assert_eq!(back.palace_id, entry.palace_id);
assert_eq!(back.event_type, entry.event_type);
assert_eq!(back.payload, entry.payload);
}
#[test]
fn activity_log_open_creates_db_file() {
let tmp = tempfile::tempdir().expect("tempdir");
let _log = ActivityLog::open(tmp.path()).expect("open");
assert!(tmp.path().join(ACTIVITY_DB_FILENAME).is_file());
}
#[test]
fn appends_assign_monotonic_ids() {
let (log, _tmp) = fresh_log();
let a = log
.append(
ActivitySource::Http,
Some("p1".into()),
"drawer_added",
json!({"x": 1}),
)
.unwrap();
let b = log
.append(
ActivitySource::Mcp,
Some("p1".into()),
"drawer_added",
json!({"x": 2}),
)
.unwrap();
assert_eq!(b, a + 1);
let listed = log.list(&ActivityFilter::default(), 10, 0).unwrap();
assert_eq!(listed.len(), 2);
assert_eq!(listed[0].id, b);
assert_eq!(listed[1].id, a);
}
#[test]
fn next_id_resumes_from_max_after_reopen() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().to_path_buf();
let id_first = {
let log = ActivityLog::open(&path).unwrap();
log.append(ActivitySource::Http, None, "palace_created", json!({}))
.unwrap()
};
let id_second = {
let log = ActivityLog::open(&path).unwrap();
log.append(ActivitySource::Http, None, "palace_created", json!({}))
.unwrap()
};
assert!(id_second > id_first, "{id_second} must exceed {id_first}");
}
#[test]
fn list_returns_newest_first() {
let (log, _tmp) = fresh_log();
for i in 0..5 {
log.append(
ActivitySource::Http,
Some(format!("p{i}")),
"drawer_added",
json!({"i": i}),
)
.unwrap();
}
let listed = log.list(&ActivityFilter::default(), 10, 0).unwrap();
let ids: Vec<u64> = listed.iter().map(|e| e.id).collect();
let mut expected = ids.clone();
expected.sort_unstable_by(|a, b| b.cmp(a));
assert_eq!(ids, expected);
}
#[test]
fn list_paginates_via_limit_and_offset() {
let (log, _tmp) = fresh_log();
for i in 0..10 {
log.append(ActivitySource::Http, None, "x", json!({"i": i}))
.unwrap();
}
let page1 = log.list(&ActivityFilter::default(), 3, 0).unwrap();
let page2 = log.list(&ActivityFilter::default(), 3, 3).unwrap();
assert_eq!(page1.len(), 3);
assert_eq!(page2.len(), 3);
let ids1: std::collections::HashSet<u64> = page1.iter().map(|e| e.id).collect();
let ids2: std::collections::HashSet<u64> = page2.iter().map(|e| e.id).collect();
assert!(ids1.is_disjoint(&ids2));
}
#[test]
fn list_filters_by_source_palace_and_time() {
let (log, _tmp) = fresh_log();
log.append(ActivitySource::Http, Some("alpha".into()), "a", json!({}))
.unwrap();
log.append(ActivitySource::Mcp, Some("alpha".into()), "a", json!({}))
.unwrap();
log.append(ActivitySource::Mcp, Some("beta".into()), "a", json!({}))
.unwrap();
log.append(ActivitySource::Http, None, "dream_completed", json!({}))
.unwrap();
let mcp_only = log
.list(
&ActivityFilter {
source: Some(ActivitySource::Mcp),
..Default::default()
},
10,
0,
)
.unwrap();
assert_eq!(mcp_only.len(), 2);
assert!(mcp_only.iter().all(|e| e.source == ActivitySource::Mcp));
let alpha = log
.list(
&ActivityFilter {
palace_id: Some("alpha".into()),
..Default::default()
},
10,
0,
)
.unwrap();
assert_eq!(alpha.len(), 2);
assert!(alpha
.iter()
.all(|e| e.palace_id.as_deref() == Some("alpha")));
let none = log
.list(
&ActivityFilter {
until: Some(chrono::Utc::now() - chrono::Duration::days(1)),
..Default::default()
},
10,
0,
)
.unwrap();
assert!(none.is_empty(), "until=yesterday should match nothing");
let mcp_alpha = log
.list(
&ActivityFilter {
source: Some(ActivitySource::Mcp),
palace_id: Some("alpha".into()),
..Default::default()
},
10,
0,
)
.unwrap();
assert_eq!(mcp_alpha.len(), 1);
}
#[test]
fn discard_variant_drops_writes_and_returns_empty_reads() {
let log = ActivityLog::discard();
assert!(log.is_discard(), "expected Discard variant");
let id = log
.append(ActivitySource::Http, None, "drawer_added", json!({"x": 1}))
.expect("discard append must succeed");
assert_eq!(id, 0, "discard always returns id 0");
assert_eq!(log.count().expect("discard count"), 0);
let listed = log
.list(&ActivityFilter::default(), 10, 0)
.expect("discard list");
assert!(listed.is_empty(), "discard list must be empty");
log.prune().expect("discard prune");
let id2 = log
.append(ActivitySource::Mcp, Some("p".into()), "x", json!({}))
.expect("discard append (second)");
assert_eq!(id2, 0);
assert_eq!(log.count().expect("discard count after writes"), 0);
}
#[test]
fn appends_evict_oldest_when_capped() {
let (log, _tmp) = fresh_log();
for _ in 0..10 {
log.append(ActivitySource::Http, None, "x", json!({}))
.unwrap();
}
assert_eq!(log.count().unwrap(), 10);
log.prune().unwrap();
assert_eq!(log.count().unwrap(), 10);
}
}