#![cfg(feature = "log")]
use crate::Compactable;
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use tokio::{
sync::{
watch::{Receiver, Sender},
Mutex,
},
task::yield_now,
};
#[derive(Debug, Clone)]
pub struct Syndicate<A> {
sender: Sender<Inner<A>>,
}
impl<A> Syndicate<A> {
pub fn new(linear_min: usize, linear_hi: usize, linear_max: usize) -> Self {
let sender = Sender::new(Inner::<A> {
linear: Vec::new(),
non_linear: Vec::new(),
offset: 0,
linear_max,
linear_hi,
linear_min,
});
Self { sender }
}
pub fn subscribe_at<B>(&self, offset: usize) -> Subscription<A, B>
where
A: Clone,
{
let mut receiver = self.sender.subscribe();
let (offset, backlog) = receiver.borrow_and_update().since(offset);
Subscription {
backlog,
offset,
receiver,
marker: PhantomData,
}
}
pub fn subscribe<B>(&self) -> Subscription<A, B>
where
A: Clone,
{
self.subscribe_at(0)
}
pub fn publish<B>(&self) -> Publisher<A, B> {
Publisher {
sender: self.sender.clone(),
marker: PhantomData,
}
}
pub fn snapshot(&self, offset: usize) -> (usize, Vec<A>)
where
A: Clone,
{
let (offset, mut elements) = self.sender.borrow().since(offset);
elements.reverse();
(offset, elements)
}
}
impl<A> Default for Syndicate<A> {
fn default() -> Self {
Syndicate::new(100, 200, 205)
}
}
#[derive(Debug)]
struct Inner<A> {
linear: Vec<A>,
non_linear: Vec<Indexed<A>>,
offset: usize,
linear_max: usize,
linear_hi: usize,
linear_min: usize,
}
impl<A> Inner<A>
where
A: Clone,
{
fn since(&self, offset: usize) -> (usize, Vec<A>) {
if offset < self.offset {
let offset0 = self.offset - self.linear.len();
let elements = if offset >= offset0 {
let bound = offset - offset0; self.linear[bound..].iter().rev().cloned().collect()
} else {
let non_linear = self
.non_linear
.iter()
.take_while(|c| c.offset > offset)
.map(|c| &c.value);
self.linear
.iter()
.rev()
.chain(non_linear)
.cloned()
.collect()
};
(self.offset, elements)
} else {
(offset, Vec::new())
}
}
}
impl<A> Inner<A>
where
A: Compactable,
{
fn push(&mut self, value: A) -> bool {
self.linear.push(value);
self.offset += 1;
if self.linear.len() >= self.linear_max {
self.compact()
}
self.linear.len() >= self.linear_hi
}
fn compact(&mut self) {
if self.linear.len() > self.linear_min {
let bound = self.linear.len() - self.linear_min; let offset0 = self.offset + 1 - self.linear.len();
let retained = self.linear[bound..].iter();
let mut keys: HashSet<A::Key> = retained.map(|a| a.compaction_key()).collect();
let removed = self.linear.drain(0..bound);
let new_compact = removed
.enumerate()
.map(|(i, a)| Indexed {
offset: i + offset0,
value: a,
})
.rev();
let old_compact = self.non_linear.drain(..);
let compact: Vec<Indexed<A>> = new_compact
.chain(old_compact)
.filter(|c| keys.insert(c.value.compaction_key()))
.collect();
self.linear
.reserve_exact(self.linear_max - self.linear.len());
self.non_linear = compact;
}
}
}
#[derive(Debug)]
pub struct Subscription<A, B> {
offset: usize,
backlog: Vec<A>,
receiver: Receiver<Inner<A>>,
marker: PhantomData<B>,
}
impl<A, B> Subscription<A, B>
where
A: Clone + TryInto<B>,
{
pub async fn pull(&mut self) -> Option<B> {
loop {
if let Some(value) = self.backlog.pop() {
if let Ok(value) = value.try_into() {
break Some(value);
}
} else {
if self.receiver.changed().await.is_ok() {
(self.offset, self.backlog) =
self.receiver.borrow_and_update().since(self.offset)
} else {
break None;
}
}
}
}
pub fn share(self) -> SharedSubscription<A, B> {
SharedSubscription {
shared: Arc::new(Mutex::new(self)),
}
}
}
#[derive(Debug, Clone)]
pub struct SharedSubscription<A, B> {
shared: Arc<Mutex<Subscription<A, B>>>,
}
impl<A, B> SharedSubscription<A, B>
where
A: Clone + TryInto<B>,
{
pub async fn pull(&self) -> Option<B> {
self.shared.lock().await.pull().await
}
}
#[derive(Debug, Clone)]
pub struct Publisher<A, B> {
sender: Sender<Inner<A>>,
marker: PhantomData<B>,
}
impl<A, B> Publisher<A, B>
where
A: Compactable,
B: Into<A>,
{
fn with_inner<F, X>(&self, f: F) -> X
where
F: FnOnce(&mut Inner<A>) -> X,
{
let mut x: Option<X> = None;
self.sender.send_modify(|inner| x = Some(f(inner)));
x.unwrap()
}
pub async fn push(&self, value: B) {
let value = value.into();
if self.with_inner(move |inner| inner.push(value)) {
yield_now().await;
}
}
pub async fn push_all(&self, values: impl IntoIterator<Item = B>) {
let mut values = values.into_iter();
loop {
let exhausted = self.with_inner(|inner| loop {
if let Some(value) = values.next() {
if inner.push(value.into()) {
break false;
}
} else {
break true;
}
});
yield_now().await;
if exhausted {
break;
}
}
}
}
#[derive(Debug)]
struct Indexed<A> {
offset: usize,
value: A,
}
#[cfg(test)]
mod test {
use super::*;
use crate::{scope, Compactable};
use rand::Rng;
use std::iter::repeat_with;
use tokio::task::JoinSet;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Message(usize, usize);
impl Compactable for Message {
type Key = usize;
fn compaction_key(&self) -> Self::Key {
self.0
}
}
#[tokio::test]
async fn test_interleaved() {
let (_, p, mut s, test_data) = fixtures();
let run_length = test_data.len();
scope(|tasker: &mut JoinSet<Result<(), String>>| {
tasker.spawn(async move {
fill_log(p, test_data).await;
Ok(())
});
tasker.spawn(async move {
let mut count = 0;
let mut prev = 0;
while let Some(Message(_, j)) = s.pull().await {
count += 1;
if j > prev {
prev = j
} else {
return Err(format!("Messages out of order {prev}, {j}"));
}
}
if count == run_length {
Ok(()) } else {
Err(format!("Messages received/sent = {}/{}", count, run_length))
}
});
Ok(())
})
.await
.unwrap();
}
#[tokio::test]
async fn test_compaction() {
let (_, p, mut s, test_data) = fixtures();
let run_length = test_data.len();
fill_log(p, test_data).await;
let mut count = 0;
let mut prev = 0;
while let Some(Message(_, j)) = s.pull().await {
count += 1;
assert!(j > prev);
prev = j;
}
assert!(
count >= 15 && count < 35,
"Messages received/sent = {}/{} expected 15..35",
count,
run_length
);
}
#[tokio::test]
async fn test_snapshot() {
let (l, p, mut s, test_data) = fixtures();
fill_log(p, test_data).await;
let (_, results) = l.snapshot(0);
for m in results {
assert_eq!(m, s.pull().await.unwrap())
}
}
#[tokio::test]
async fn test_subscribe_at() {
let (l, p1, _, mut test_data_a) = fixtures();
let p2 = p1.clone();
let test_data_b = test_data_a.split_off(test_data_a.len() / 2);
fill_log(p1, test_data_a).await;
let (offset, _) = l.snapshot(0);
let mut s = l.subscribe_at::<Message>(offset);
fill_log(p2, test_data_b).await;
let (_, results_b) = l.snapshot(offset);
for m in results_b {
assert_eq!(m, s.pull().await.unwrap())
}
}
fn fixtures() -> (
Syndicate<Message>,
Publisher<Message, Message>,
Subscription<Message, Message>,
Vec<Message>,
) {
let log = empty_log();
let p = log.publish();
let s = log.subscribe();
(log, p, s, data())
}
fn empty_log() -> Syndicate<Message> {
let linear_min = 10;
let linear_hi = 15;
let linear_max = 20;
Syndicate::new(linear_min, linear_hi, linear_max)
}
fn data() -> Vec<Message> {
let key_space = 15;
let run_length = 1007;
let arb = repeat_with(|| rand::thread_rng().gen_range(0usize..key_space));
let seq = (1usize..).into_iter();
arb.zip(seq)
.map(|(i, j)| Message(i, j))
.take(run_length)
.collect()
}
async fn fill_log(p: Publisher<Message, Message>, test_data: Vec<Message>) {
for m in test_data {
p.push(m).await;
}
}
}