use std::{pin::Pin, sync::Arc, task::Context, task::Poll};
use futures_util::Stream;
use log::warn;
use tokio::sync::mpsc;
use crate::{
ErrorKind, Href,
property::Property,
sync::{
Side,
declare::StoragePair,
mapping::ResolvedMapping,
operation::{MappingUidSource, Operation, PropertyOp},
plan::{
Plan, PlanError,
collection::{GenerateError, generate_collection_operations},
create_mappings_for_pair,
},
status::StatusDatabase,
},
watch::Event,
};
pub struct IncrementalPlan(mpsc::Receiver<Result<Operation, PlanError>>);
impl IncrementalPlan {
pub async fn new(
pair: StoragePair,
status: Option<Arc<StatusDatabase>>,
events: impl IntoIterator<Item = (Event, Side)>,
) -> Result<IncrementalPlan, PlanError> {
let mappings = create_mappings_for_pair(&pair).await?;
let mut affected = Vec::new();
let mut properties: Vec<(Arc<ResolvedMapping>, Property)> = Vec::new();
let mut has_storage_event = false;
for (event, side) in events {
match event {
Event::Storage => {
has_storage_event = true;
break;
}
Event::Collection(col, _) | Event::Item(col, ..) => {
if let Some(mapping) = find_mapping(&mappings, side, &col) {
push_affected(&mut affected, mapping);
properties.retain(|(m, _)| {
m.for_side(side).href() != mapping.for_side(side).href()
});
} else {
warn!("Event for unknown collection; falling back to full sync.");
has_storage_event = true;
break;
}
}
Event::Property(col, property, _) => {
if let Some(mapping) = find_mapping(&mappings, side, &col) {
if affected
.iter()
.any(|m| m.for_side(side).href() == mapping.for_side(side).href())
{
continue;
}
if properties.iter().any(|(m, p)| {
m.for_side(side).href() == mapping.for_side(side).href()
&& *p == property
}) {
continue;
}
properties.push((mapping.clone(), property));
} else {
warn!("Property event for unknown collection '{col}'. Ignoring.");
}
}
}
}
if has_storage_event {
let rx = Plan::receiver(pair, status, mappings)?;
return Ok(IncrementalPlan(rx));
}
let (tx, rx) = mpsc::channel(4);
tokio::spawn(run_generator(tx, pair, status, affected, properties));
Ok(IncrementalPlan(rx))
}
}
fn find_mapping<'a>(
mappings: &'a [Arc<ResolvedMapping>],
side: Side,
href: &Href,
) -> Option<&'a Arc<ResolvedMapping>> {
mappings.iter().find(|m| m.for_side(side).href() == href)
}
fn push_affected(affected: &mut Vec<Arc<ResolvedMapping>>, mapping: &Arc<ResolvedMapping>) {
if !affected
.iter()
.any(|m| m.a().href() == mapping.a().href() || m.b().href() == mapping.b().href())
{
affected.push(mapping.clone());
}
}
async fn run_generator(
tx: mpsc::Sender<Result<Operation, PlanError>>,
pair: StoragePair,
status: Option<Arc<StatusDatabase>>,
mappings: Vec<Arc<ResolvedMapping>>,
properties: Vec<(Arc<ResolvedMapping>, Property)>,
) {
for mapping in mappings {
match generate_collection_operations(&tx, &pair, mapping, status.as_deref()).await {
Ok(()) => {}
Err(GenerateError::Plan(e)) => {
if tx.send(Err(e)).await.is_err() {
return;
}
}
Err(GenerateError::ChannelClosed) => return,
}
}
for (mapping, property) in properties {
match generate_single_property_op(&mapping, property, &pair, status.as_deref()).await {
Ok(Some(op)) => {
if tx.send(Ok(Operation::Property(op))).await.is_err() {
return;
}
}
Ok(None) => {}
Err(e) => {
if tx.send(Err(e)).await.is_err() {
return;
}
}
}
}
}
async fn generate_single_property_op(
mapping: &Arc<ResolvedMapping>,
property: Property,
pair: &StoragePair,
status: Option<&StatusDatabase>,
) -> Result<Option<PropertyOp>, PlanError> {
let Some(status) = status else {
warn!("Property sync: unknown collection '{}'.", mapping.alias());
return Ok(None);
};
let Ok(Some(mapping_uid)) = status.get_mapping_uid(mapping.a().href(), mapping.b().href())
else {
warn!("Property sync: no mapping_uid for '{}'.", mapping.alias());
return Ok(None);
};
let value_a = match pair
.storage_a()
.get_property(mapping.a().href(), property)
.await
{
Ok(v) => v,
Err(e) if e.kind == ErrorKind::Unsupported || e.kind == ErrorKind::DoesNotExist => {
return Ok(None);
}
Err(e) => return Err(e.into()),
};
let value_b = match pair
.storage_b()
.get_property(mapping.b().href(), property)
.await
{
Ok(v) => v,
Err(e) if e.kind == ErrorKind::Unsupported || e.kind == ErrorKind::DoesNotExist => {
return Ok(None);
}
Err(e) => return Err(e.into()),
};
let previous = status.get_property(mapping_uid, property)?;
Ok(pair.mode.decide_property_action(
property,
value_a,
value_b,
previous,
mapping,
&MappingUidSource::Immediate(mapping_uid),
None,
))
}
impl Stream for IncrementalPlan {
type Item = Result<Operation, PlanError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_recv(cx)
}
}