use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::commit::{CommittedEntry, MutationCommitter, TenantId};
#[derive(Debug, Clone, Deserialize)]
pub struct HistoryParams {
#[serde(default)]
pub from: Option<u64>,
#[serde(default)]
pub limit: Option<usize>,
}
impl HistoryParams {
pub fn from_index(&self) -> u64 {
self.from.unwrap_or(0)
}
pub fn effective_limit(&self) -> usize {
self.limit.unwrap_or(100).min(MAX_HISTORY_LIMIT)
}
}
pub const MAX_HISTORY_LIMIT: usize = 1000;
#[derive(Debug, Serialize)]
pub struct HistoryResponse {
pub tenant_id: TenantId,
pub from_index: u64,
pub limit: usize,
pub high_watermark: u64,
pub entries: Vec<CommittedEntry>,
}
pub async fn read_history(
committer: &Arc<dyn MutationCommitter>,
tenant_id: TenantId,
params: &HistoryParams,
) -> Result<HistoryResponse, crate::commit::CommitError> {
let from = params.from_index();
let limit = params.effective_limit();
let entries = committer.read_range(tenant_id, from, limit).await?;
let high_watermark = committer.high_watermark(tenant_id).await?;
Ok(HistoryResponse {
tenant_id,
from_index: from,
limit,
high_watermark,
entries,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit::{CommitOptions, LocalSqliteCommitter, MemoryMutation};
fn upsert(tag: &str) -> MemoryMutation {
MemoryMutation::UpsertMemory {
rid: format!("mem_{tag}"),
text: tag.into(),
memory_type: "semantic".into(),
importance: 0.5,
valence: 0.0,
half_life: 168.0,
namespace: "default".into(),
certainty: 1.0,
domain: "general".into(),
source: "user".into(),
emotional_state: None,
embedding: None,
metadata: serde_json::json!({}),
extracted_entities: vec![],
created_at_unix_micros: None,
embedding_model: None,
}
}
#[tokio::test]
async fn read_history_returns_full_log_for_small_tenant() {
let committer: Arc<dyn MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let t = TenantId::new(1);
for i in 1..=5 {
committer
.commit(t, upsert(&format!("e{i}")), CommitOptions::new())
.await
.unwrap();
}
let resp = read_history(
&committer,
t,
&HistoryParams {
from: None,
limit: None,
},
)
.await
.unwrap();
assert_eq!(resp.tenant_id, t);
assert_eq!(resp.from_index, 0);
assert_eq!(resp.high_watermark, 5);
assert_eq!(resp.entries.len(), 5);
for (i, entry) in resp.entries.iter().enumerate() {
assert_eq!(entry.log_index, (i as u64) + 1);
}
}
#[tokio::test]
async fn limit_is_capped_at_max_history_limit() {
let committer: Arc<dyn MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let resp = read_history(
&committer,
TenantId::new(1),
&HistoryParams {
from: Some(0),
limit: Some(99_999_999),
},
)
.await
.unwrap();
assert_eq!(resp.limit, MAX_HISTORY_LIMIT);
}
#[tokio::test]
async fn read_history_respects_from_index() {
let committer: Arc<dyn MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let t = TenantId::new(1);
for i in 1..=10 {
committer
.commit(t, upsert(&format!("e{i}")), CommitOptions::new())
.await
.unwrap();
}
let resp = read_history(
&committer,
t,
&HistoryParams {
from: Some(7),
limit: Some(100),
},
)
.await
.unwrap();
assert_eq!(resp.entries.len(), 4); assert_eq!(resp.entries[0].log_index, 7);
assert_eq!(resp.entries[3].log_index, 10);
assert_eq!(resp.high_watermark, 10);
}
#[tokio::test]
async fn read_history_for_unknown_tenant_returns_empty() {
let committer: Arc<dyn MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let resp = read_history(&committer, TenantId::new(999), &HistoryParams::default())
.await
.unwrap();
assert!(resp.entries.is_empty());
assert_eq!(resp.high_watermark, 0);
}
}
impl Default for HistoryParams {
fn default() -> Self {
Self {
from: None,
limit: None,
}
}
}