use std::{
collections::HashMap,
sync::{Arc, RwLock},
task::Waker,
};
use super::{ChunkIdentifier, Position};
#[derive(Debug, Clone, PartialEq)]
pub enum Update<Item, Gap> {
NewItemsChunk {
previous: Option<ChunkIdentifier>,
new: ChunkIdentifier,
next: Option<ChunkIdentifier>,
},
NewGapChunk {
previous: Option<ChunkIdentifier>,
new: ChunkIdentifier,
next: Option<ChunkIdentifier>,
gap: Gap,
},
RemoveChunk(ChunkIdentifier),
PushItems {
at: Position,
items: Vec<Item>,
},
ReplaceItem {
at: Position,
item: Item,
},
RemoveItem {
at: Position,
},
DetachLastItems {
at: Position,
},
StartReattachItems,
EndReattachItems,
Clear,
}
impl<Item, Gap> Update<Item, Gap> {
pub fn into_items(self) -> Vec<Item> {
match self {
Update::NewItemsChunk { .. }
| Update::NewGapChunk { .. }
| Update::RemoveChunk(_)
| Update::RemoveItem { .. }
| Update::DetachLastItems { .. }
| Update::StartReattachItems
| Update::EndReattachItems
| Update::Clear => vec![],
Update::PushItems { items, .. } => items,
Update::ReplaceItem { item, .. } => vec![item],
}
}
}
#[derive(Debug)]
pub struct ObservableUpdates<Item, Gap> {
pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
}
impl<Item, Gap> ObservableUpdates<Item, Gap> {
pub(super) fn new() -> Self {
Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
}
pub(super) fn push(&mut self, update: Update<Item, Gap>) {
self.inner.write().unwrap().push(update);
}
pub(super) fn clear_pending(&mut self) {
self.inner.write().unwrap().clear_pending();
}
pub fn take(&mut self) -> Vec<Update<Item, Gap>>
where
Item: Clone,
Gap: Clone,
{
self.inner.write().unwrap().take().to_owned()
}
#[cfg(test)]
pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
let token = self.new_reader_token();
UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
}
pub(super) fn new_reader_token(&mut self) -> ReaderToken {
let mut inner = self.inner.write().unwrap();
inner.last_token += 1;
let last_token = inner.last_token;
inner.last_index_per_reader.insert(last_token, 0);
last_token
}
}
pub(super) type ReaderToken = usize;
#[derive(Debug)]
pub(super) struct UpdatesInner<Item, Gap> {
updates: Vec<Update<Item, Gap>>,
last_index_per_reader: HashMap<ReaderToken, usize>,
last_token: ReaderToken,
wakers: Vec<Waker>,
}
impl<Item, Gap> UpdatesInner<Item, Gap> {
const MAIN_READER_TOKEN: ReaderToken = 0;
fn new() -> Self {
Self {
updates: Vec::with_capacity(8),
last_index_per_reader: {
let mut map = HashMap::with_capacity(2);
map.insert(Self::MAIN_READER_TOKEN, 0);
map
},
last_token: Self::MAIN_READER_TOKEN,
wakers: Vec::with_capacity(2),
}
}
fn push(&mut self, update: Update<Item, Gap>) {
self.updates.push(update);
for waker in self.wakers.drain(..) {
waker.wake();
}
}
fn clear_pending(&mut self) {
self.updates.clear();
for idx in self.last_index_per_reader.values_mut() {
*idx = 0;
}
}
fn take(&mut self) -> &[Update<Item, Gap>] {
self.take_with_token(Self::MAIN_READER_TOKEN)
}
pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
self.garbage_collect();
let index = self
.last_index_per_reader
.get_mut(&token)
.expect("Given `UpdatesToken` does not map to any index");
let slice = &self.updates[*index..];
*index = self.updates.len();
slice
}
pub(super) fn is_reader_up_to_date(&self, token: ReaderToken) -> bool {
*self.last_index_per_reader.get(&token).expect("unknown reader token") == self.updates.len()
}
#[cfg(test)]
fn len(&self) -> usize {
self.updates.len()
}
fn garbage_collect(&mut self) {
let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
if min_index > 0 {
let _ = self.updates.drain(0..min_index);
for index in self.last_index_per_reader.values_mut() {
*index -= min_index;
}
}
}
}
#[cfg(test)]
pub(super) struct UpdatesSubscriber<Item, Gap> {
updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>,
token: ReaderToken,
}
#[cfg(test)]
impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
#[cfg(test)]
fn new(updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
Self { updates, token }
}
}
#[cfg(test)]
impl<Item, Gap> futures_core::Stream for UpdatesSubscriber<Item, Gap>
where
Item: Clone,
Gap: Clone,
{
type Item = Vec<Update<Item, Gap>>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
context: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let Some(updates) = self.updates.upgrade() else {
return std::task::Poll::Ready(None);
};
let mut updates = updates.write().unwrap();
let the_updates = updates.take_with_token(self.token);
if the_updates.is_empty() {
updates.wakers.push(context.waker().clone());
return std::task::Poll::Pending;
}
std::task::Poll::Ready(Some(the_updates.to_owned()))
}
}
#[cfg(test)]
impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
fn drop(&mut self) {
if let Some(updates) = self.updates.upgrade() {
let mut updates = updates.write().unwrap();
let _ = updates.last_index_per_reader.remove(&self.token);
}
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
task::{Context, Poll, Wake},
};
use assert_matches::assert_matches;
use futures_core::Stream;
use futures_util::pin_mut;
use super::{super::LinkedChunk, ChunkIdentifier, Position, UpdatesInner};
use crate::linked_chunk::Update;
#[test]
fn test_updates_take_and_garbage_collector() {
use super::Update::*;
let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
let other_token = {
let updates = linked_chunk.updates().unwrap();
let mut inner = updates.inner.write().unwrap();
inner.last_token += 1;
let other_token = inner.last_token;
inner.last_index_per_reader.insert(other_token, 0);
other_token
};
{
let updates = linked_chunk.updates().unwrap();
assert_eq!(
updates.take(),
&[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
);
assert_eq!(
updates.inner.write().unwrap().take_with_token(other_token),
&[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
);
}
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.take().is_empty());
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
}
linked_chunk.push_items_back(['a']);
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);
{
let updates = linked_chunk.updates().unwrap();
{
assert_eq!(updates.inner.read().unwrap().len(), 3);
}
assert_eq!(
updates.take(),
&[
PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
{
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 3);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&0));
}
}
linked_chunk.push_items_back(['d']);
linked_chunk.push_items_back(['e']);
linked_chunk.push_items_back(['f']);
{
let updates = linked_chunk.updates().unwrap();
assert_eq!(
updates.inner.write().unwrap().take_with_token(other_token),
&[
PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
]
);
{
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 6);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&6));
}
}
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
{
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 3);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&0));
assert_eq!(indices.get(&other_token), Some(&3));
}
}
linked_chunk.push_items_back(['g']);
linked_chunk.push_items_back(['h']);
linked_chunk.push_items_back(['i']);
{
let updates = linked_chunk.updates().unwrap();
assert_eq!(
updates.take(),
&[
PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
]
);
assert_eq!(
updates.inner.write().unwrap().take_with_token(other_token),
&[
PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
]
);
{
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 3);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&3));
}
}
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.take().is_empty());
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
{
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 0);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&0));
assert_eq!(indices.get(&other_token), Some(&0));
}
}
}
struct CounterWaker {
number_of_wakeup: Mutex<usize>,
}
impl Wake for CounterWaker {
fn wake(self: Arc<Self>) {
*self.number_of_wakeup.lock().unwrap() += 1;
}
}
#[test]
fn test_updates_stream() {
use super::Update::*;
let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
let waker = counter_waker.clone().into();
let mut context = Context::from_waker(&waker);
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
pin_mut!(updates_subscriber);
assert_matches!(
updates_subscriber.as_mut().poll_next(&mut context),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
);
}
);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
linked_chunk.push_items_back(['a']);
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
assert_matches!(
updates_subscriber.as_mut().poll_next(&mut context),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
}
);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
assert_eq!(
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
assert_matches!(
updates_subscriber.as_mut().poll_next(&mut context),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[
PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
}
);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
drop(linked_chunk);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
}
#[test]
fn test_updates_multiple_streams() {
use super::Update::*;
let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
let waker1 = counter_waker1.clone().into();
let waker2 = counter_waker2.clone().into();
let mut context1 = Context::from_waker(&waker1);
let mut context2 = Context::from_waker(&waker2);
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
pin_mut!(updates_subscriber1);
let updates_subscriber2_token = {
let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
pin_mut!(updates_subscriber2);
assert_matches!(
updates_subscriber1.as_mut().poll_next(&mut context1),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
);
}
);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
assert_matches!(
updates_subscriber2.as_mut().poll_next(&mut context2),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
);
}
);
assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
linked_chunk.push_items_back(['a']);
assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
assert_matches!(
updates_subscriber1.as_mut().poll_next(&mut context1),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
}
);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
assert_matches!(
updates_subscriber2.as_mut().poll_next(&mut context2),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
}
);
assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);
assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
assert_matches!(
updates_subscriber1.as_mut().poll_next(&mut context1),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[
PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
}
);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
let _ = linked_chunk.updates().unwrap().take();
let _ = linked_chunk.updates().unwrap().take();
{
let updates = linked_chunk.updates().unwrap();
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 2);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
}
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
{
let updates = linked_chunk.updates().unwrap();
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 2);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
}
updates_subscriber2.token
};
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
{
let updates = linked_chunk.updates().unwrap();
let inner = updates.inner.read().unwrap();
assert_eq!(inner.len(), 0);
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
assert_eq!(indices.get(&updates_subscriber2_token), None); }
drop(linked_chunk);
assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
}
#[test]
fn test_update_into_items() {
let updates: Update<_, u32> =
Update::PushItems { at: Position::new(ChunkIdentifier(0), 0), items: vec![1, 2, 3] };
assert_eq!(updates.into_items(), vec![1, 2, 3]);
let updates: Update<u32, u32> = Update::Clear;
assert!(updates.into_items().is_empty());
let updates: Update<u32, u32> =
Update::RemoveItem { at: Position::new(ChunkIdentifier(0), 0) };
assert!(updates.into_items().is_empty());
let updates: Update<u32, u32> =
Update::ReplaceItem { at: Position::new(ChunkIdentifier(0), 0), item: 42 };
assert_eq!(updates.into_items(), vec![42]);
}
}