use crate::sync::types::*;
use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
use std::time::Duration;
#[async_trait]
pub trait DocumentStore: Send + Sync {
async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId>;
async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>>;
async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()>;
fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream>;
async fn get(&self, collection: &str, doc_id: &DocumentId) -> Result<Option<Document>> {
let query = Query::Eq {
field: "id".to_string(),
value: Value::String(doc_id.clone()),
};
let docs = self.query(collection, &query).await?;
Ok(docs.into_iter().next())
}
async fn count(&self, collection: &str, query: &Query) -> Result<usize> {
let docs = self.query(collection, query).await?;
Ok(docs.len())
}
async fn delete(
&self,
collection: &str,
doc_id: &DocumentId,
reason: Option<&str>,
) -> Result<crate::qos::DeleteResult> {
let policy = self.deletion_policy(collection);
if policy.is_immutable() {
return Ok(crate::qos::DeleteResult::immutable());
}
self.remove(collection, doc_id).await?;
let _ = reason;
Ok(crate::qos::DeleteResult::soft_deleted(policy))
}
async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> Result<bool> {
if let Some(doc) = self.get(collection, doc_id).await? {
if let Some(deleted) = doc.fields.get("_deleted") {
return Ok(deleted.as_bool().unwrap_or(false));
}
}
Ok(false)
}
fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
crate::qos::DeletionPolicy::default()
}
async fn get_tombstones(&self, collection: &str) -> Result<Vec<crate::qos::Tombstone>> {
let _ = collection;
Ok(vec![])
}
async fn apply_tombstone(&self, tombstone: &crate::qos::Tombstone) -> Result<()> {
self.remove(&tombstone.collection, &tombstone.document_id)
.await
}
}
#[async_trait]
pub trait PeerDiscovery: Send + Sync {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn discovered_peers(&self) -> Result<Vec<PeerInfo>>;
async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()>;
async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()>;
fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>);
async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>>;
async fn is_peer_connected(&self, peer_id: &PeerId) -> Result<bool> {
Ok(self
.get_peer_info(peer_id)
.await?
.map(|info| info.connected)
.unwrap_or(false))
}
}
#[async_trait]
pub trait SyncEngine: Send + Sync {
async fn start_sync(&self) -> Result<()>;
async fn stop_sync(&self) -> Result<()>;
async fn subscribe(&self, collection: &str, query: &Query) -> Result<SyncSubscription>;
async fn set_priority(&self, collection: &str, priority: Priority) -> Result<()> {
let _ = (collection, priority);
Ok(())
}
async fn is_syncing(&self) -> Result<bool>;
async fn force_sync(&self) -> Result<()> {
Ok(())
}
async fn connect_to_peer(&self, endpoint_id_hex: &str, addresses: &[String]) -> Result<bool> {
let _ = (endpoint_id_hex, addresses);
Ok(false)
}
}
#[async_trait]
pub trait DataSyncBackend: Send + Sync {
async fn initialize(&self, config: BackendConfig) -> Result<()>;
async fn shutdown(&self) -> Result<()>;
fn document_store(&self) -> Arc<dyn DocumentStore>;
fn peer_discovery(&self) -> Arc<dyn PeerDiscovery>;
fn sync_engine(&self) -> Arc<dyn SyncEngine>;
async fn is_ready(&self) -> bool {
true
}
fn backend_info(&self) -> BackendInfo {
BackendInfo {
name: "Unknown".to_string(),
version: "0.0.0".to_string(),
}
}
fn as_any(&self) -> &dyn std::any::Any;
}
#[derive(Debug, Clone)]
pub struct BackendInfo {
pub name: String,
pub version: String,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sync::types::TransportConfig;
use std::collections::HashMap;
use std::time::SystemTime;
#[test]
fn test_trait_object_safety() {
fn _takes_document_store(_: &dyn DocumentStore) {}
fn _takes_peer_discovery(_: &dyn PeerDiscovery) {}
fn _takes_sync_engine(_: &dyn SyncEngine) {}
fn _takes_backend(_: &dyn DataSyncBackend) {}
}
#[test]
fn test_backend_info_default() {
struct Stub;
impl Stub {
fn backend_info(&self) -> BackendInfo {
BackendInfo {
name: "Unknown".to_string(),
version: "0.0.0".to_string(),
}
}
}
let info = Stub.backend_info();
assert_eq!(info.name, "Unknown");
assert_eq!(info.version, "0.0.0");
}
#[test]
fn test_backend_info_debug_clone() {
let info = BackendInfo {
name: "Automerge".to_string(),
version: "0.7.0".to_string(),
};
let cloned = info.clone();
assert_eq!(cloned.name, "Automerge");
assert_eq!(format!("{:?}", info), format!("{:?}", cloned));
}
struct MockDocStore {
docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
}
impl MockDocStore {
fn new() -> Self {
Self {
docs: std::sync::Mutex::new(HashMap::new()),
}
}
fn insert(&self, collection: &str, doc: Document) {
let mut docs = self.docs.lock().unwrap_or_else(|e| e.into_inner());
docs.entry(collection.to_string()).or_default().push(doc);
}
}
#[async_trait]
impl DocumentStore for MockDocStore {
async fn upsert(&self, collection: &str, document: Document) -> anyhow::Result<DocumentId> {
let id = document.id.clone().unwrap_or_else(|| "auto-id".to_string());
let mut doc = document;
doc.id = Some(id.clone());
self.insert(collection, doc);
Ok(id)
}
async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
let docs = self.docs.lock().unwrap_or_else(|e| e.into_inner());
let col = docs.get(collection).cloned().unwrap_or_default();
match query {
Query::All => Ok(col),
Query::Eq { field, value } => Ok(col
.into_iter()
.filter(|d| {
if field == "id" {
d.id.as_deref() == value.as_str()
} else {
d.fields.get(field.as_str()) == Some(value)
}
})
.collect()),
_ => Ok(col),
}
}
async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
let mut docs = self.docs.lock().unwrap_or_else(|e| e.into_inner());
if let Some(col) = docs.get_mut(collection) {
col.retain(|d| d.id.as_deref() != Some(doc_id.as_str()));
}
Ok(())
}
fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
Ok(ChangeStream { receiver: rx })
}
}
#[tokio::test]
async fn test_document_store_get_found() {
let store = MockDocStore::new();
let mut fields = HashMap::new();
fields.insert("name".to_string(), Value::String("test".to_string()));
let doc = Document::with_id("doc1", fields);
store.insert("col", doc);
let result = store.get("col", &"doc1".to_string()).await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().id, Some("doc1".to_string()));
}
#[tokio::test]
async fn test_document_store_get_not_found() {
let store = MockDocStore::new();
let result = store.get("col", &"missing".to_string()).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_document_store_count() {
let store = MockDocStore::new();
store.insert("col", Document::with_id("a", HashMap::new()));
store.insert("col", Document::with_id("b", HashMap::new()));
let count = store.count("col", &Query::All).await.unwrap();
assert_eq!(count, 2);
let count = store.count("empty", &Query::All).await.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_document_store_delete_default() {
let store = MockDocStore::new();
store.insert("col", Document::with_id("d1", HashMap::new()));
let result = store
.delete("col", &"d1".to_string(), Some("test reason"))
.await
.unwrap();
assert!(result.deleted);
}
#[tokio::test]
async fn test_document_store_is_deleted_false() {
let store = MockDocStore::new();
store.insert("col", Document::with_id("d1", HashMap::new()));
let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
assert!(!deleted);
}
#[tokio::test]
async fn test_document_store_is_deleted_true() {
let store = MockDocStore::new();
let mut fields = HashMap::new();
fields.insert("_deleted".to_string(), Value::Bool(true));
store.insert("col", Document::with_id("d1", fields));
let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
assert!(deleted);
}
#[tokio::test]
async fn test_document_store_is_deleted_nonexistent() {
let store = MockDocStore::new();
let deleted = store
.is_deleted("col", &"missing".to_string())
.await
.unwrap();
assert!(!deleted);
}
#[tokio::test]
async fn test_document_store_get_tombstones_default() {
let store = MockDocStore::new();
let tombstones = store.get_tombstones("col").await.unwrap();
assert!(tombstones.is_empty());
}
#[tokio::test]
async fn test_document_store_apply_tombstone_default() {
let store = MockDocStore::new();
store.insert("col", Document::with_id("d1", HashMap::new()));
let tombstone = crate::qos::Tombstone {
collection: "col".to_string(),
document_id: "d1".to_string(),
deleted_at: SystemTime::now(),
deleted_by: "user-1".to_string(),
lamport: 1,
reason: None,
};
store.apply_tombstone(&tombstone).await.unwrap();
let result = store.get("col", &"d1".to_string()).await.unwrap();
assert!(result.is_none());
}
#[test]
fn test_deletion_policy_default() {
let store = MockDocStore::new();
let policy = store.deletion_policy("any_collection");
assert!(policy.is_soft_delete());
}
struct MockSyncEngine;
#[async_trait]
impl SyncEngine for MockSyncEngine {
async fn start_sync(&self) -> anyhow::Result<()> {
Ok(())
}
async fn stop_sync(&self) -> anyhow::Result<()> {
Ok(())
}
async fn subscribe(
&self,
_collection: &str,
_query: &Query,
) -> anyhow::Result<SyncSubscription> {
Ok(SyncSubscription::new("test", ()))
}
async fn is_syncing(&self) -> anyhow::Result<bool> {
Ok(true)
}
}
#[tokio::test]
async fn test_sync_engine_set_priority_default_noop() {
let engine = MockSyncEngine;
let result = engine.set_priority("col", Priority::High).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_sync_engine_force_sync_default_noop() {
let engine = MockSyncEngine;
let result = engine.force_sync().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_sync_engine_connect_to_peer_default() {
let engine = MockSyncEngine;
let result = engine
.connect_to_peer("abcd1234", &["192.168.1.1:5000".to_string()])
.await
.unwrap();
assert!(!result); }
fn make_peer_info(peer_id: &str, connected: bool) -> PeerInfo {
PeerInfo {
peer_id: peer_id.to_string(),
address: None,
transport: TransportType::Tcp,
connected,
last_seen: SystemTime::now(),
metadata: HashMap::new(),
}
}
struct MockPeerDiscovery;
#[async_trait]
impl PeerDiscovery for MockPeerDiscovery {
async fn start(&self) -> anyhow::Result<()> {
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
Ok(())
}
async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
Ok(vec![make_peer_info("peer-1", true)])
}
async fn add_peer(&self, _address: &str, _transport: TransportType) -> anyhow::Result<()> {
Ok(())
}
async fn wait_for_peer(&self, _peer_id: &PeerId, _timeout: Duration) -> anyhow::Result<()> {
Ok(())
}
fn on_peer_event(&self, _callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {}
async fn get_peer_info(&self, peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
if peer_id == "peer-1" {
Ok(Some(make_peer_info("peer-1", true)))
} else if peer_id == "peer-disconnected" {
Ok(Some(make_peer_info("peer-disconnected", false)))
} else {
Ok(None)
}
}
}
#[tokio::test]
async fn test_peer_discovery_is_connected_true() {
let disc = MockPeerDiscovery;
let connected = disc.is_peer_connected(&"peer-1".to_string()).await.unwrap();
assert!(connected);
}
#[tokio::test]
async fn test_peer_discovery_is_connected_false_disconnected() {
let disc = MockPeerDiscovery;
let connected = disc
.is_peer_connected(&"peer-disconnected".to_string())
.await
.unwrap();
assert!(!connected);
}
#[tokio::test]
async fn test_peer_discovery_is_connected_false_unknown() {
let disc = MockPeerDiscovery;
let connected = disc
.is_peer_connected(&"unknown".to_string())
.await
.unwrap();
assert!(!connected);
}
struct MockBackend;
#[async_trait]
impl DataSyncBackend for MockBackend {
async fn initialize(&self, _config: BackendConfig) -> anyhow::Result<()> {
Ok(())
}
async fn shutdown(&self) -> anyhow::Result<()> {
Ok(())
}
fn document_store(&self) -> Arc<dyn DocumentStore> {
Arc::new(MockDocStore::new())
}
fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
Arc::new(MockPeerDiscovery)
}
fn sync_engine(&self) -> Arc<dyn SyncEngine> {
Arc::new(MockSyncEngine)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[tokio::test]
async fn test_data_sync_backend_is_ready_default() {
let backend = MockBackend;
assert!(backend.is_ready().await);
}
#[test]
fn test_data_sync_backend_backend_info_default() {
let backend = MockBackend;
let info = backend.backend_info();
assert_eq!(info.name, "Unknown");
assert_eq!(info.version, "0.0.0");
}
#[test]
fn test_data_sync_backend_as_any() {
let backend = MockBackend;
let any = backend.as_any();
assert!(any.downcast_ref::<MockBackend>().is_some());
}
#[tokio::test]
async fn test_data_sync_backend_accessors() {
let backend = MockBackend;
let _store = backend.document_store();
let _disc = backend.peer_discovery();
let _engine = backend.sync_engine();
}
#[tokio::test]
async fn test_document_store_delete_immutable_policy() {
#[allow(dead_code)]
struct ImmutableDocStore {
docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
}
impl ImmutableDocStore {
fn new() -> Self {
Self {
docs: std::sync::Mutex::new(HashMap::new()),
}
}
}
#[async_trait]
impl DocumentStore for ImmutableDocStore {
async fn upsert(
&self,
_collection: &str,
_document: Document,
) -> anyhow::Result<DocumentId> {
Ok("id".to_string())
}
async fn query(
&self,
_collection: &str,
_query: &Query,
) -> anyhow::Result<Vec<Document>> {
Ok(vec![])
}
async fn remove(&self, _collection: &str, _doc_id: &DocumentId) -> anyhow::Result<()> {
Ok(())
}
fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
Ok(ChangeStream { receiver: rx })
}
fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
crate::qos::DeletionPolicy::Immutable
}
}
let store = ImmutableDocStore::new();
let result = store
.delete("col", &"doc1".to_string(), None)
.await
.unwrap();
assert!(!result.deleted);
}
#[tokio::test]
async fn test_document_store_upsert_auto_id() {
let store = MockDocStore::new();
let doc = Document {
id: None,
fields: HashMap::new(),
updated_at: SystemTime::now(),
};
let id = store.upsert("col", doc).await.unwrap();
assert_eq!(id, "auto-id");
let result = store.get("col", &"auto-id".to_string()).await.unwrap();
assert!(result.is_some());
}
#[tokio::test]
async fn test_document_store_query_field_match() {
let store = MockDocStore::new();
let mut fields = HashMap::new();
fields.insert("status".to_string(), Value::String("active".to_string()));
store.insert("col", Document::with_id("d1", fields.clone()));
let mut fields2 = HashMap::new();
fields2.insert("status".to_string(), Value::String("inactive".to_string()));
store.insert("col", Document::with_id("d2", fields2));
let results = store
.query(
"col",
&Query::Eq {
field: "status".to_string(),
value: Value::String("active".to_string()),
},
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].id, Some("d1".to_string()));
}
#[tokio::test]
async fn test_document_store_query_other_variant() {
let store = MockDocStore::new();
store.insert("col", Document::with_id("d1", HashMap::new()));
store.insert("col", Document::with_id("d2", HashMap::new()));
let results = store
.query(
"col",
&Query::Gt {
field: "x".to_string(),
value: serde_json::json!(0),
},
)
.await
.unwrap();
assert_eq!(results.len(), 2);
}
#[tokio::test]
async fn test_document_store_delete_with_none_reason() {
let store = MockDocStore::new();
store.insert("col", Document::with_id("d1", HashMap::new()));
let result = store.delete("col", &"d1".to_string(), None).await.unwrap();
assert!(result.deleted);
}
#[tokio::test]
async fn test_document_store_remove_nonexistent() {
let store = MockDocStore::new();
store.remove("col", &"missing".to_string()).await.unwrap();
}
#[tokio::test]
async fn test_document_store_is_deleted_with_non_bool_field() {
let store = MockDocStore::new();
let mut fields = HashMap::new();
fields.insert(
"_deleted".to_string(),
Value::String("not-a-bool".to_string()),
);
store.insert("col", Document::with_id("d1", fields));
let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
assert!(!deleted);
}
#[test]
fn test_mock_doc_store_observe() {
let store = MockDocStore::new();
let stream = store.observe("col", &Query::All);
assert!(stream.is_ok());
}
#[tokio::test]
async fn test_mock_peer_discovery_methods() {
let disc = MockPeerDiscovery;
disc.start().await.unwrap();
disc.stop().await.unwrap();
let peers = disc.discovered_peers().await.unwrap();
assert_eq!(peers.len(), 1);
disc.add_peer("10.0.0.1:5000", TransportType::Tcp)
.await
.unwrap();
disc.wait_for_peer(&"peer-1".to_string(), Duration::from_secs(1))
.await
.unwrap();
disc.on_peer_event(Box::new(|_| {}));
let info = disc.get_peer_info(&"peer-1".to_string()).await.unwrap();
assert!(info.is_some());
}
#[tokio::test]
async fn test_mock_sync_engine_methods() {
let engine = MockSyncEngine;
engine.start_sync().await.unwrap();
engine.stop_sync().await.unwrap();
let sub = engine.subscribe("col", &Query::All).await.unwrap();
assert_eq!(sub.collection(), "test");
assert!(engine.is_syncing().await.unwrap());
}
#[tokio::test]
async fn test_mock_backend_lifecycle() {
let backend = MockBackend;
backend
.initialize(BackendConfig {
app_id: "test-app".to_string(),
persistence_dir: std::path::PathBuf::from("/tmp/test"),
shared_key: None,
transport: TransportConfig::default(),
extra: HashMap::new(),
})
.await
.unwrap();
backend.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_immutable_doc_store_methods() {
#[allow(dead_code)]
struct ImmutableStore {
docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
}
impl ImmutableStore {
fn new() -> Self {
Self {
docs: std::sync::Mutex::new(HashMap::new()),
}
}
}
#[async_trait]
impl DocumentStore for ImmutableStore {
async fn upsert(
&self,
_collection: &str,
_document: Document,
) -> anyhow::Result<DocumentId> {
Ok("id".to_string())
}
async fn query(
&self,
_collection: &str,
_query: &Query,
) -> anyhow::Result<Vec<Document>> {
Ok(vec![])
}
async fn remove(&self, _collection: &str, _doc_id: &DocumentId) -> anyhow::Result<()> {
Ok(())
}
fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
Ok(ChangeStream { receiver: rx })
}
fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
crate::qos::DeletionPolicy::Immutable
}
}
let store = ImmutableStore::new();
let id = store
.upsert("col", Document::with_id("x", HashMap::new()))
.await
.unwrap();
assert_eq!(id, "id");
let docs = store.query("col", &Query::All).await.unwrap();
assert!(docs.is_empty());
store.remove("col", &"x".to_string()).await.unwrap();
let _stream = store.observe("col", &Query::All).unwrap();
let result = store
.delete("col", &"doc1".to_string(), None)
.await
.unwrap();
assert!(!result.deleted);
}
}