#![cfg(feature = "automerge-backend")]
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use peat_mesh::sync::types::ChangeEvent;
use peat_protocol::network::IrohTransport;
use peat_protocol::security::FormationKey;
use peat_protocol::storage::AutomergeStore;
use peat_protocol::sync::automerge::AutomergeIrohBackend;
use peat_protocol::sync::traits::DataSyncBackend;
use peat_protocol::sync::types::{BackendConfig, Document, Query, TransportConfig};
use serial_test::serial;
use tokio::time::timeout;
async fn make_backend() -> (Arc<AutomergeIrohBackend>, tempfile::TempDir) {
let temp = tempfile::tempdir().expect("tempdir");
let store = Arc::new(AutomergeStore::open(temp.path()).expect("store"));
let transport = Arc::new(IrohTransport::new().await.expect("transport"));
let backend = Arc::new(AutomergeIrohBackend::from_parts(store, transport));
let cfg = BackendConfig {
app_id: "origin-threading-test".to_string(),
persistence_dir: temp.path().to_path_buf(),
shared_key: Some(FormationKey::generate_secret()),
transport: TransportConfig::default(),
extra: HashMap::new(),
};
backend.initialize(cfg).await.expect("initialize");
(backend, temp)
}
fn track_doc(id: &str, lat: f64, lon: f64) -> Document {
let mut fields: HashMap<String, serde_json::Value> = HashMap::new();
fields.insert("lat".to_string(), serde_json::json!(lat));
fields.insert("lon".to_string(), serde_json::json!(lon));
Document::with_id(id.to_string(), fields)
}
async fn next_updated_for(
stream: &mut peat_mesh::sync::types::ChangeStream,
expected_doc_id: &str,
) -> ChangeEvent {
let deadline = Duration::from_secs(5);
timeout(deadline, async {
loop {
let ev = stream.receiver.recv().await.expect("stream closed");
if let ChangeEvent::Updated { ref document, .. } = ev {
if document.id.as_deref() == Some(expected_doc_id) {
return ev;
}
}
}
})
.await
.expect("timed out waiting for matching Updated event")
}
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn upsert_with_origin_propagates_to_observer() {
let (backend, _tmp) = make_backend().await;
let store = backend.document_store();
let mut stream = store.observe("tracks", &Query::All).expect("observe");
let doc = track_doc("track-001", 40.0, -74.0);
store
.upsert_with_origin("tracks", doc, Some("ble".into()))
.await
.expect("upsert");
let ev = next_updated_for(&mut stream, "track-001").await;
match ev {
ChangeEvent::Updated { origin, .. } => {
assert_eq!(
origin,
Some("ble".to_string()),
"origin must thread through observer"
);
}
_ => unreachable!("filtered by next_updated_for"),
}
let _ = backend.shutdown().await;
}
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn upsert_with_none_origin_emits_none() {
let (backend, _tmp) = make_backend().await;
let store = backend.document_store();
let mut stream = store.observe("tracks", &Query::All).expect("observe");
let doc = track_doc("track-002", 41.0, -75.0);
store
.upsert_with_origin("tracks", doc, None)
.await
.expect("upsert");
let ev = next_updated_for(&mut stream, "track-002").await;
match ev {
ChangeEvent::Updated { origin, .. } => {
assert_eq!(origin, None, "None origin must round-trip");
}
_ => unreachable!(),
}
let _ = backend.shutdown().await;
}
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn rapid_same_key_burst_keeps_origin_present() {
let (backend, _tmp) = make_backend().await;
let store = backend.document_store();
let mut stream = store.observe("tracks", &Query::All).expect("observe");
for i in 0..5 {
let doc = track_doc("track-rapid", 40.0 + i as f64 * 0.001, -74.0);
store
.upsert_with_origin("tracks", doc, Some(format!("ble-{}", i)))
.await
.expect("upsert");
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
let mut delivered_with_origin = 0;
while tokio::time::Instant::now() < deadline {
let recv_result =
tokio::time::timeout(Duration::from_millis(100), stream.receiver.recv()).await;
match recv_result {
Ok(Some(ChangeEvent::Updated {
document, origin, ..
})) if document.id.as_deref() == Some("track-rapid") => {
assert!(
origin.is_some(),
"rapid burst must not surface None origin when every upsert passed Some(...)"
);
let label = origin.unwrap();
assert!(
label.starts_with("ble-"),
"origin {:?} must come from this test's burst",
label
);
delivered_with_origin += 1;
}
Ok(Some(_)) => {
}
Ok(None) => break, Err(_) => break, }
}
assert!(
delivered_with_origin >= 1,
"at least one Updated event must reach the observer for the burst"
);
let _ = backend.shutdown().await;
}