use std::sync::Arc;
use async_trait::async_trait;
use super::applier::{Applier, ApplyError};
use super::local::LocalSqliteCommitter;
use super::mutation::{MemoryMutation, TenantId};
use super::trait_def::{
CommitError, CommitOptions, CommitReceipt, CommittedEntry, MutationCommitter,
};
#[async_trait]
pub trait Submitter: Send + Sync {
async fn submit(
&self,
tenant_id: TenantId,
mutation: MemoryMutation,
opts: CommitOptions,
) -> Result<CommitReceipt, CommitError>;
async fn read_range(
&self,
tenant_id: TenantId,
from_index: u64,
limit: usize,
) -> Result<Vec<CommittedEntry>, CommitError>;
async fn high_watermark(&self, tenant_id: TenantId) -> Result<u64, CommitError>;
async fn list_active_tenants(&self) -> Result<Vec<TenantId>, CommitError>;
async fn ensure_linearizable(&self) -> Result<(), CommitError> {
Ok(())
}
}
pub struct LocalSqliteSubmitter {
committer: Arc<LocalSqliteCommitter>,
applier: Arc<dyn Applier>,
}
impl LocalSqliteSubmitter {
pub fn new(committer: Arc<LocalSqliteCommitter>, applier: Arc<dyn Applier>) -> Self {
Self { committer, applier }
}
pub fn committer(&self) -> &Arc<LocalSqliteCommitter> {
&self.committer
}
}
#[async_trait]
impl Submitter for LocalSqliteSubmitter {
async fn submit(
&self,
tenant_id: TenantId,
mutation: MemoryMutation,
opts: CommitOptions,
) -> Result<CommitReceipt, CommitError> {
let wait_for_apply = opts.wait_for_apply;
let receipt = self
.committer
.commit(tenant_id, mutation.clone(), opts)
.await?;
if wait_for_apply {
match self
.applier
.apply(tenant_id, receipt.log_index, &mutation)
.await
{
Ok(()) => {}
Err(ApplyError::NotYetWired { .. }) => {}
Err(e) if e.is_idempotent_ok() => {}
Err(e) => {
return Err(CommitError::StorageFailure {
message: format!("apply failed for log_index {}: {e}", receipt.log_index),
});
}
}
}
Ok(receipt)
}
async fn read_range(
&self,
tenant_id: TenantId,
from_index: u64,
limit: usize,
) -> Result<Vec<CommittedEntry>, CommitError> {
self.committer
.read_range(tenant_id, from_index, limit)
.await
}
async fn high_watermark(&self, tenant_id: TenantId) -> Result<u64, CommitError> {
self.committer.high_watermark(tenant_id).await
}
async fn list_active_tenants(&self) -> Result<Vec<TenantId>, CommitError> {
self.committer.list_active_tenants().await
}
async fn ensure_linearizable(&self) -> Result<(), CommitError> {
self.committer.ensure_linearizable().await
}
}
#[async_trait]
impl MutationCommitter for LocalSqliteSubmitter {
async fn commit(
&self,
tenant_id: TenantId,
mutation: MemoryMutation,
opts: CommitOptions,
) -> Result<CommitReceipt, CommitError> {
self.submit(tenant_id, mutation, opts).await
}
async fn read_range(
&self,
tenant_id: TenantId,
from_index: u64,
limit: usize,
) -> Result<Vec<CommittedEntry>, CommitError> {
Submitter::read_range(self, tenant_id, from_index, limit).await
}
async fn high_watermark(&self, tenant_id: TenantId) -> Result<u64, CommitError> {
Submitter::high_watermark(self, tenant_id).await
}
async fn list_active_tenants(&self) -> Result<Vec<TenantId>, CommitError> {
Submitter::list_active_tenants(self).await
}
async fn ensure_linearizable(&self) -> Result<(), CommitError> {
Submitter::ensure_linearizable(self).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit::applier::LocalApplier;
use crate::commit::mutation::OpId;
use serde_json::json;
fn upsert_memory(rid: &str) -> MemoryMutation {
MemoryMutation::UpsertMemory {
rid: rid.to_string(),
text: "hello".into(),
memory_type: "semantic".into(),
importance: 0.5,
valence: 0.0,
half_life: 86400.0,
namespace: "test".into(),
certainty: 1.0,
domain: "general".into(),
source: "test".into(),
emotional_state: None,
embedding: None,
extracted_entities: vec![],
created_at_unix_micros: None,
embedding_model: None,
metadata: json!({}),
}
}
fn build_submitter() -> LocalSqliteSubmitter {
let committer = Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let applier: Arc<dyn Applier> = Arc::new(LocalApplier::new());
LocalSqliteSubmitter::new(committer, applier)
}
#[tokio::test]
async fn submit_round_trips_through_log() {
let s = build_submitter();
let r = s
.submit(
TenantId::new(1),
upsert_memory("rid-1"),
CommitOptions::new(),
)
.await
.unwrap();
assert_eq!(r.log_index, 1);
assert_eq!(r.tenant_id, TenantId::new(1));
let entries = Submitter::read_range(&s, TenantId::new(1), 1, 10)
.await
.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].log_index, 1);
}
#[tokio::test]
async fn submit_assigns_monotonic_per_tenant_log_index() {
let s = build_submitter();
let r1 = s
.submit(
TenantId::new(1),
upsert_memory("rid-1"),
CommitOptions::new(),
)
.await
.unwrap();
let r2 = s
.submit(
TenantId::new(1),
upsert_memory("rid-2"),
CommitOptions::new(),
)
.await
.unwrap();
let r3 = s
.submit(
TenantId::new(2),
upsert_memory("rid-3"),
CommitOptions::new(),
)
.await
.unwrap();
assert_eq!(r1.log_index, 1);
assert_eq!(r2.log_index, 2);
assert_eq!(r3.log_index, 1);
}
#[tokio::test]
async fn submit_is_idempotent_on_op_id() {
let s = build_submitter();
let op_id = OpId::new_random();
let opts = CommitOptions::new().with_op_id(op_id);
let r1 = s
.submit(TenantId::new(1), upsert_memory("rid-1"), opts.clone())
.await
.unwrap();
let r2 = s
.submit(TenantId::new(1), upsert_memory("rid-1"), opts)
.await
.unwrap();
assert_eq!(r1.log_index, r2.log_index);
assert_eq!(r1.op_id, r2.op_id);
assert_eq!(r1.committed_at, r2.committed_at);
let entries = Submitter::read_range(&s, TenantId::new(1), 1, 10)
.await
.unwrap();
assert_eq!(entries.len(), 1);
}
#[tokio::test]
async fn high_watermark_tracks_per_tenant() {
let s = build_submitter();
assert_eq!(
Submitter::high_watermark(&s, TenantId::new(1))
.await
.unwrap(),
0
);
let _ = s
.submit(
TenantId::new(1),
upsert_memory("rid-1"),
CommitOptions::new(),
)
.await
.unwrap();
let _ = s
.submit(
TenantId::new(1),
upsert_memory("rid-2"),
CommitOptions::new(),
)
.await
.unwrap();
assert_eq!(
Submitter::high_watermark(&s, TenantId::new(1))
.await
.unwrap(),
2
);
assert_eq!(
Submitter::high_watermark(&s, TenantId::new(2))
.await
.unwrap(),
0
);
}
#[tokio::test]
async fn list_active_tenants_finds_every_writer() {
let s = build_submitter();
let _ = s
.submit(
TenantId::new(1),
upsert_memory("rid-1"),
CommitOptions::new(),
)
.await
.unwrap();
let _ = s
.submit(
TenantId::new(7),
upsert_memory("rid-2"),
CommitOptions::new(),
)
.await
.unwrap();
let _ = s
.submit(
TenantId::new(7),
upsert_memory("rid-3"),
CommitOptions::new(),
)
.await
.unwrap();
let mut tenants = Submitter::list_active_tenants(&s).await.unwrap();
tenants.sort();
assert_eq!(tenants, vec![TenantId::new(1), TenantId::new(7)]);
}
#[tokio::test]
async fn ensure_linearizable_is_trivially_ok_on_single_node() {
let s = build_submitter();
Submitter::ensure_linearizable(&s).await.unwrap();
}
#[tokio::test]
async fn no_wait_does_not_call_applier() {
let s = build_submitter();
let r = s
.submit(
TenantId::new(1),
upsert_memory("rid-1"),
CommitOptions::new().no_wait(),
)
.await
.unwrap();
assert!(r.applied_at.is_none());
}
#[tokio::test]
async fn submitter_committer_accessor_works() {
let s = build_submitter();
let _committer = s.committer();
}
#[allow(dead_code)]
fn _dyn_submitter_compile_check(_s: Arc<dyn Submitter>) {}
}