use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::RwLock;
use crate::state::SharedState;
use super::{handler::HandlerState, staging::StagingEvent};
pub type ConsumerId = String;
pub type ConsumerIndex = String;
pub type Topic = String;
pub type PartitionKey = String;
pub type TopicOffset = usize;
#[derive(Debug, Clone)]
pub struct Consumer {
pub tx: tokio::sync::mpsc::Sender<StagingEvent>,
pub rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<StagingEvent>>>,
}
#[derive(Default, Clone, Debug)]
pub struct ConsumerGroup {
inner: Arc<RwLock<InnerConsumerGroup>>,
}
#[derive(Default, Debug)]
pub struct InnerConsumerGroup {
pub id: ConsumerId,
topic: Topic,
pub(crate) offset: TopicOffset,
pub(crate) cur_offset: TopicOffset,
members: BTreeMap<ConsumerIndex, Consumer>,
}
impl ConsumerGroup {
pub fn new(id: impl Into<ConsumerId>, topic: impl Into<Topic>) -> Self {
Self {
inner: Arc::new(RwLock::new(InnerConsumerGroup::new(id, topic))),
}
}
pub async fn add_consumer(&self, index: ConsumerIndex, consumer: Consumer) {
let mut inner = self.inner.write().await;
inner.add_consumer(index, consumer);
}
pub async fn update_offset(&self, offset: TopicOffset) {
let mut inner = self.inner.write().await;
inner.update_offset(offset);
}
pub async fn reconcile_lag(&self, state: &SharedState) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
inner.reconcile_lag(state).await?;
Ok(())
}
#[allow(dead_code)]
pub async fn get_offset(&self) -> TopicOffset {
let inner = self.inner.read().await;
inner.offset
}
pub async fn get_id(&self) -> ConsumerId {
let inner = self.inner.read().await;
inner.id.clone()
}
}
impl InnerConsumerGroup {
pub fn new(id: impl Into<ConsumerId>, topic: impl Into<Topic>) -> Self {
Self {
id: id.into(),
topic: topic.into(),
offset: 0,
cur_offset: 0,
members: BTreeMap::default(),
}
}
pub fn add_consumer(&mut self, index: ConsumerIndex, consumer: Consumer) {
if self.members.insert(index.to_owned(), consumer).is_some() {
tracing::warn!(index = index, "consumer replaced");
}
}
pub fn update_offset(&mut self, offset: TopicOffset) {
self.offset = offset;
}
pub async fn reconcile_lag(&mut self, state: &SharedState) -> anyhow::Result<()> {
if self.offset <= self.cur_offset {
return Ok(());
}
let consumer = match self.members.first_key_value() {
Some((_, consumer)) => consumer,
None => return Ok(()),
};
tracing::debug!(consumer_id = self.id, "sending update for consumer");
state
.handler()
.handle_offset(&self.topic, consumer, self.cur_offset, self.offset)
.await?;
self.cur_offset = self.offset;
Ok(())
}
}
impl Default for Consumer {
fn default() -> Self {
Self::new()
}
}
impl Consumer {
pub fn new() -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(128);
Self {
tx,
rx: Arc::new(tokio::sync::Mutex::new(rx)),
}
}
}
#[derive(Clone)]
pub struct Consumers {
storage: Arc<RwLock<BTreeMap<ConsumerId, ConsumerGroup>>>,
subscriptions: Arc<RwLock<BTreeMap<Topic, Vec<ConsumerId>>>>,
}
impl Consumers {
pub fn new() -> Self {
Self {
storage: Arc::default(),
subscriptions: Arc::default(),
}
}
pub async fn add_consumer(
&self,
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
topic: impl Into<Topic>,
) -> anyhow::Result<Option<Consumer>> {
let id = id.into();
let index = index.into();
let topic = topic.into();
let consumer = {
let mut storage = self.storage.write().await;
if !storage.contains_key(&id) {
storage.insert(id.clone(), ConsumerGroup::new(&id, &topic));
}
let consumer_group = storage.get_mut(&id).unwrap();
let consumer = Consumer::default();
consumer_group
.add_consumer(index.clone(), consumer.clone())
.await;
consumer
};
{
let mut subscriptions = self.subscriptions.write().await;
if !subscriptions.contains_key(&topic) {
subscriptions.insert(topic.clone(), Vec::default());
}
let subscription_consumers = subscriptions.get_mut(&topic).unwrap();
if !subscription_consumers.contains(&id) {
subscription_consumers.push(id.clone());
}
}
Ok(Some(consumer))
}
#[allow(dead_code)]
pub async fn get_consumer_group(&self, id: impl Into<ConsumerId>) -> Option<ConsumerGroup> {
let storage = self.storage.read().await;
let consumer_group = storage.get(&id.into())?;
Some(consumer_group.to_owned())
}
pub async fn get_consumer_groups(&self) -> Vec<ConsumerGroup> {
let storage = self.storage.read().await;
storage.iter().map(|(_, v)| v).cloned().collect()
}
pub async fn notify_update(
&self,
topic: impl Into<Topic>,
offset: impl Into<TopicOffset>,
) -> anyhow::Result<()> {
let topic = topic.into();
let offset = offset.into();
let subscriptions = self.subscriptions.read().await;
let subscription = match subscriptions.get(&topic) {
Some(s) => s,
None => {
tracing::debug!(topic = &topic, "no subscription for topic");
return Ok(());
}
};
let mut storage = self.storage.write().await;
for consumer_id in subscription {
match storage.get_mut(consumer_id) {
Some(consumer_groups) => {
consumer_groups.update_offset(offset).await;
}
None => {
tracing::trace!(
topic = &topic,
consumer_id = &consumer_id,
"found no consumer"
)
}
}
}
Ok(())
}
}
pub trait ConsumersState {
fn consumers(&self) -> Consumers;
}
impl ConsumersState for SharedState {
fn consumers(&self) -> Consumers {
self.consumers.clone()
}
}
#[cfg(test)]
mod test {
use tracing_test::traced_test;
use crate::services::staging::{Staging, StagingEvent};
use super::*;
#[tokio::test]
async fn can_add_consumer() -> anyhow::Result<()> {
let consumer_id = "some-consumer-id";
let consumer_index = "some-consumer-index";
let topic = "some-topic";
let consumers = Consumers::new();
consumers
.add_consumer(consumer_id, consumer_index, topic)
.await?;
let consumer = consumers.get_consumer_group(consumer_id).await.unwrap();
assert_eq!(0, consumer.get_offset().await);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn can_notify_consumer() -> anyhow::Result<()> {
let consumer_id = "some-consumer-id".to_string();
let consumer_index = "some-consumer-index".to_string();
let topic = "some-topic".to_string();
let offset = 9usize;
let staging = Staging::new().await?;
for _ in 0..10 {
let offset = staging
.publish(StagingEvent {
topic: topic.clone(),
published: chrono::Utc::now(),
value: Vec::new(),
})
.await?;
tracing::trace!("published offset: {}", offset);
}
let consumers = Consumers::new();
consumers
.add_consumer(&consumer_id, &consumer_index, &topic)
.await?;
let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
assert_eq!(0, consumer.get_offset().await);
consumers.notify_update(&topic, offset).await?;
let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
assert_eq!(9, consumer.get_offset().await);
Ok(())
}
}