use fieldx_plus::child_build;
use fieldx_plus::fx_plus;
use tokio::sync::OwnedRwLockWriteGuard;
use crate::Cache;
use crate::DataController;
type KeyGuard<DC> = (
<DC as DataController>::Key,
OwnedRwLockWriteGuard<Option<<DC as DataController>::CacheUpdate>>,
);
type KeyOptGuard<DC> = (
<DC as DataController>::Key,
Option<OwnedRwLockWriteGuard<Option<<DC as DataController>::CacheUpdate>>>,
);
#[fx_plus(
child(Cache<DC>, rc_strong),
parent,
default(off),
sync,
rc,
get(off),
builder(vis(pub(crate)))
)]
pub struct UpdateIterator<DC>
where
DC: DataController + Send + Sync + 'static,
{
#[fieldx(inner_mut, private, get, get_mut, builder(private))]
unprocessed: Vec<KeyOptGuard<DC>>,
#[fieldx(inner_mut, private, get(copy), set, builder(off))]
next_idx: usize,
#[fieldx(inner_mut, get_mut(vis(pub(crate))), builder(off))]
worked: Vec<KeyGuard<DC>>,
}
impl<DC> UpdateIterator<DC>
where
DC: DataController + Send + Sync + 'static,
{
#[inline(always)]
fn take_back(&self, key_guard: (DC::Key, OwnedRwLockWriteGuard<Option<DC::CacheUpdate>>)) {
if key_guard.1.is_some() {
self.worked_mut().push(key_guard);
}
}
#[inline]
pub fn confirm_all(&self) {
for (_key, guard) in self.worked_mut().iter_mut() {
guard.take();
}
}
#[inline(always)]
pub fn len(&self) -> usize {
self.unprocessed().len()
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.unprocessed().is_empty()
}
pub fn next(&self) -> Option<UpdateIteratorItem<DC>> {
let mut unprocessed = self.unprocessed_mut();
loop {
let next_idx = self.next_idx();
if next_idx >= unprocessed.len() {
return None;
}
let Some((key, guard)) = unprocessed.get_mut(next_idx).map(|(k, g)| (k.clone(), g.take()))
else {
panic!(
"Internal error of UpdateIterator<{}>: next update key not found at index {next_idx}",
std::any::type_name::<DC>()
);
};
self.set_next_idx(next_idx + 1);
let guard = if let Some(g) = guard {
g
}
else if let Some(update) = self.parent().updates().get(&key).cloned() {
let Ok(guard) = update.data.clone().try_write_owned()
else {
continue;
};
guard
}
else {
continue;
};
if guard.is_some() {
return Some(
child_build!(
self, UpdateIteratorItem<DC> {
key_guard: Some((key.clone(), guard)),
}
)
.unwrap(),
);
}
}
}
#[inline]
pub fn reset(&self) {
self.set_next_idx(0);
self.worked_mut().truncate(0);
}
}
impl<DC> UpdateIteratorBuilder<DC>
where
DC: DataController + Send + Sync + 'static,
{
pub(crate) fn keys(self, keys: Vec<DC::Key>) -> Self {
let unprocessed = keys.into_iter().map(|key| (key, None)).collect::<Vec<_>>();
self.unprocessed(unprocessed)
}
pub(crate) fn key_guard(self, kg: (DC::Key, OwnedRwLockWriteGuard<Option<DC::CacheUpdate>>)) -> Self {
self.unprocessed(vec![(kg.0, Some(kg.1))])
}
}
#[fx_plus(
child(UpdateIterator<DC>, rc_strong),
default(off),
sync,
)]
pub struct UpdateIteratorItem<DC>
where
DC: DataController + Send + Sync + 'static,
{
key_guard: Option<KeyGuard<DC>>,
}
impl<DC> UpdateIteratorItem<DC>
where
DC: DataController + Send + Sync + 'static,
{
pub fn update(&self) -> &DC::CacheUpdate {
self.key_guard
.as_ref()
.expect("Internal error: guard is None")
.1
.as_ref()
.expect("Internal error: update data cannot be None")
}
pub fn key(&self) -> &DC::Key {
&self.key_guard.as_ref().expect("Internal error: guard is None").0
}
pub fn confirm(mut self) {
if let Some(mut guard) = self.key_guard.take() {
guard.1.take();
}
else {
unreachable!("Internal error: guard is None");
}
}
}
impl<DC> Drop for UpdateIteratorItem<DC>
where
DC: DataController + Send + Sync + 'static,
{
fn drop(&mut self) {
if let Some(key_guard) = self.key_guard.take() {
self.parent().take_back(key_guard);
}
}
}