use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use hyphae::{JoinExt, MapExt, Signal, SubscriptionGuard, Watchable};
use log::{debug, error};
use serde::de::DeserializeOwned;
use crate::{
client::MykoClient,
common::with_id::WithId,
core::item::Eventable,
query::{QueryFactory, QueryHandler, QueryParams},
server::CellServerCtx,
wire::{MEvent, MEventType},
};
#[derive(Debug, Clone, Copy)]
pub struct EntityStoreSyncOptions {
pub delete_stale_remote: bool,
}
impl Default for EntityStoreSyncOptions {
fn default() -> Self {
Self {
delete_stale_remote: true,
}
}
}
type ItemComparator<T> = Arc<dyn Fn(&T, &T) -> bool + Send + Sync>;
pub struct EntityStoreSyncConfig<T, QLocal, QRemote>
where
T: Eventable + WithId + Clone,
QLocal: QueryFactory + QueryHandler + QueryParams<Item = T> + Clone + Send + Sync + 'static,
QRemote: QueryParams<Item = T> + Clone + Send + Sync + 'static,
{
pub client: MykoClient,
pub local_ctx: Arc<CellServerCtx>,
pub local_query: QLocal,
pub remote_query: QRemote,
pub options: EntityStoreSyncOptions,
pub items_equal: ItemComparator<T>,
}
pub struct EntityStoreSync<T>
where
T: Eventable + WithId + Clone + Send + Sync + 'static,
{
_sync_guard: SubscriptionGuard,
_marker: PhantomData<T>,
}
impl<T> EntityStoreSync<T>
where
T: Eventable + WithId + Clone + Send + Sync + 'static,
{
fn index_items(items: &[Arc<T>]) -> HashMap<String, Arc<T>> {
items
.iter()
.cloned()
.map(|item| (item.id().to_string(), item))
.collect()
}
fn sync_once(
client: &MykoClient,
local_items: &[Arc<T>],
remote_items: &[Arc<T>],
options: EntityStoreSyncOptions,
items_equal: &(dyn Fn(&T, &T) -> bool + Send + Sync),
) {
let local = Self::index_items(local_items);
let remote = Self::index_items(remote_items);
let events = Self::reconcile(&local, &remote, options, items_equal);
if events.is_empty() {
return;
}
debug!(
"EntityStoreSync local={} remote={} events={}",
local.len(),
remote.len(),
events.len()
);
if let Err(err) = client.send_event_batch(events) {
error!("EntityStoreSync send failed: {}", err);
}
}
pub fn reconcile(
local: &HashMap<String, Arc<T>>,
remote: &HashMap<String, Arc<T>>,
options: EntityStoreSyncOptions,
items_equal: &(dyn Fn(&T, &T) -> bool + Send + Sync),
) -> Vec<MEvent> {
let mut events = Vec::new();
for (id, local_item) in local {
let should_set = match remote.get(id) {
Some(remote_item) => !items_equal(local_item.as_ref(), remote_item.as_ref()),
None => true,
};
if should_set {
events.push(MEvent::from_item(local_item.as_ref(), MEventType::SET, ""));
}
}
if options.delete_stale_remote {
for (id, remote_item) in remote {
if !local.contains_key(id) {
events.push(MEvent::del(remote_item.as_ref(), ""));
}
}
}
events
}
pub fn new<QLocal, QRemote>(config: EntityStoreSyncConfig<T, QLocal, QRemote>) -> Arc<Self>
where
T: DeserializeOwned + std::fmt::Debug,
QLocal: QueryFactory + QueryHandler + QueryParams<Item = T> + Clone + Send + Sync + 'static,
QRemote: QueryParams<Item = T> + Clone + Send + Sync + 'static,
{
let EntityStoreSyncConfig {
client,
local_ctx,
local_query,
remote_query,
options,
items_equal,
} = config;
let local_cell = local_ctx
.query_map(local_query, local_ctx.new_server_transaction())
.entries()
.map(|entries: &Vec<(Arc<str>, Arc<T>)>| {
entries
.iter()
.map(|(_, item)| item.clone())
.collect::<Vec<_>>()
});
let remote_cell = client.watch_query(remote_query);
let joined = local_cell.join(&remote_cell);
let sync_client = client.clone();
let options_for_join = options;
let items_equal_for_join = items_equal.clone();
let joined_guard = joined.subscribe(move |signal| {
if let Signal::Value(value) = signal {
let (local_items, remote_items) = &**value;
Self::sync_once(
&sync_client,
local_items,
remote_items,
options_for_join,
items_equal_for_join.as_ref(),
);
}
});
Arc::new(EntityStoreSync {
_sync_guard: joined_guard,
_marker: PhantomData,
})
}
}