use std::sync::{Arc, Mutex as StdMutex};
use ai_store_core::{
Event, Label, Patch, ProjectionSink, Seq, Store, StoreConfig, StoreError, StreamId, Timestamp,
};
use ai_store_mem::{MemCacheBackend, MemEventBackend};
use ai_store_sync::BlockingStore;
use async_trait::async_trait;
use serde_json::{json, Value};
fn patch(v: Value) -> Patch {
serde_json::from_value::<Patch>(v).unwrap()
}
fn build_store() -> Store {
Store::new(
Arc::new(MemEventBackend::new()),
Arc::new(MemCacheBackend::new()),
Vec::new(),
Vec::new(),
StoreConfig::default(),
)
}
#[derive(Default)]
struct RecordSink {
id: String,
seen: StdMutex<Vec<(String, u64)>>,
}
#[async_trait]
impl ProjectionSink for RecordSink {
fn id(&self) -> &str {
&self.id
}
async fn commit(
&self,
stream: &StreamId,
seq: Seq,
_state: &Value,
_event: &Event,
) -> Result<(), StoreError> {
self.seen
.lock()
.unwrap()
.push((stream.as_str().to_string(), seq.0));
Ok(())
}
}
#[test]
fn owned_runtime_append_state_read_roundtrip() {
let bs = BlockingStore::new(build_store()).expect("runtime");
let s = StreamId::new("doc");
let seq1 = bs
.append(
&s,
"init",
patch(json!([{ "op": "add", "path": "", "value": { "n": 0 } }])),
json!({}),
)
.unwrap();
assert_eq!(seq1, Seq(1));
let seq2 = bs
.append(
&s,
"bump",
patch(json!([{ "op": "replace", "path": "/n", "value": 1 }])),
json!({}),
)
.unwrap();
assert_eq!(seq2, Seq(2));
assert_eq!(bs.state(&s).unwrap(), json!({ "n": 1 }));
assert_eq!(bs.head(&s).unwrap(), Some(Seq(2)));
let events = bs.read(&s, Seq(1), 10).unwrap();
assert_eq!(events.len(), 2);
}
#[test]
fn owned_runtime_revert_appends_event() {
let bs = BlockingStore::new(build_store()).expect("runtime");
let s = StreamId::new("doc");
bs.append(
&s,
"init",
patch(json!([{ "op": "add", "path": "", "value": { "n": 0 } }])),
json!({}),
)
.unwrap();
bs.append(
&s,
"bump",
patch(json!([{ "op": "replace", "path": "/n", "value": 9 }])),
json!({}),
)
.unwrap();
let seq_reverted = bs.revert(&s, Seq(1)).unwrap();
assert_eq!(seq_reverted, Seq(3));
assert_eq!(bs.state(&s).unwrap(), json!({ "n": 0 }));
assert_eq!(bs.head(&s).unwrap(), Some(Seq(3)));
}
#[test]
fn import_event_preserves_caller_supplied_timestamp_via_blocking_facade() {
let bs = BlockingStore::new(build_store()).expect("runtime");
let s = StreamId::new("doc");
let at = Timestamp(1_700_000_000_000);
let seq = bs
.import_event(
&s,
"legacy_create",
patch(json!([{ "op": "add", "path": "", "value": { "n": 0 } }])),
json!({}),
at,
)
.unwrap();
assert_eq!(seq, Seq(1));
let events = bs.read(&s, Seq(1), 1).unwrap();
assert_eq!(events[0].at, at);
}
#[test]
fn clone_shares_state() {
let bs = BlockingStore::new(build_store()).expect("runtime");
let bs2 = bs.clone();
let s = StreamId::new("doc");
bs.append(
&s,
"init",
patch(json!([{ "op": "add", "path": "", "value": { "hello": "world" } }])),
json!({}),
)
.unwrap();
assert_eq!(bs2.state(&s).unwrap(), json!({ "hello": "world" }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn borrowed_handle_from_spawn_blocking() {
let store = build_store();
let handle = tokio::runtime::Handle::current();
let bs = BlockingStore::with_handle(store, handle);
let bs2 = bs.clone();
tokio::task::spawn_blocking(move || {
let s = StreamId::new("doc");
bs2.append(
&s,
"init",
patch(json!([{ "op": "add", "path": "", "value": { "k": 1 } }])),
json!({}),
)
.unwrap();
})
.await
.unwrap();
let s = StreamId::new("doc");
assert_eq!(bs.as_async().state(&s).await.unwrap(), json!({ "k": 1 }));
}
#[test]
fn label_delete_removes_label_via_blocking_facade() {
let bs = BlockingStore::new(build_store()).expect("runtime");
let s = StreamId::new("doc");
bs.append(
&s,
"init",
patch(json!([{ "op": "add", "path": "", "value": {} }])),
json!({}),
)
.unwrap();
bs.label_set(&s, &Label::new("v1"), Seq(1)).unwrap();
let existed = bs.label_delete(&s, &Label::new("v1")).unwrap();
assert!(existed);
let err = bs.label_resolve(&s, &Label::new("v1")).unwrap_err();
assert!(matches!(err, StoreError::UnknownLabel(name) if name == "v1"));
let existed = bs.label_delete(&s, &Label::new("v1")).unwrap();
assert!(!existed);
}
#[test]
fn materialize_to_sink_dumps_head_via_blocking_facade() {
let sink = Arc::new(RecordSink {
id: "record".to_string(),
seen: StdMutex::new(Vec::new()),
});
let store = Store::new(
Arc::new(MemEventBackend::new()),
Arc::new(MemCacheBackend::new()),
Vec::new(),
vec![sink.clone()],
StoreConfig::default(),
);
let bs = BlockingStore::new(store).expect("runtime");
let s = StreamId::new("doc");
bs.append(
&s,
"init",
patch(json!([{ "op": "add", "path": "", "value": { "n": 1 } }])),
json!({}),
)
.unwrap();
sink.seen.lock().unwrap().clear();
let dumped = bs.materialize_to_sink(&s, "record", None).unwrap();
assert_eq!(dumped, Seq(1));
assert_eq!(
sink.seen.lock().unwrap().clone(),
vec![("doc".to_string(), 1)]
);
let err = bs.materialize_to_sink(&s, "nope", None).unwrap_err();
assert!(matches!(err, StoreError::UnknownSink(id) if id == "nope"));
}