message_db 0.0.5

Event-Sourced autonomous services for rust
Documentation
use crate::message::Message;
use crate::reader::Reader;
use crate::writer::Writer;
use serde_json::json;
use sqlx::{Error, PgPool};
use uuid::Uuid;

pub struct Subscription<'a> {
  reader: Reader<'a>,
  writer: Writer<'a>,

  current_position: i64,
  messages_since_last_position_write: i64,

  subscriber_stream_name: String,
  stream_name: String,

  position_update_interval: i64,
  origin_stream_name: Option<String>,
  consumer_group_member: Option<i64>,
  consumer_group_size: Option<i64>,
}

impl<'a> Subscription<'a> {
  pub fn new(
    reader: Reader<'a>,
    writer: Writer<'a>,

    stream_name: String,
    subscriber_id: String,

    position_update_interval: Option<i64>,
    origin_stream_name: Option<String>,
    consumer_group_member: Option<i64>,
    consumer_group_size: Option<i64>,
  ) -> Self {
    let current_position = 0;
    let messages_since_last_position_write = 0;
    let subscriber_stream_name = format!("subscriber-{}", subscriber_id);

    Self {
      reader,
      writer,
      stream_name,

      origin_stream_name,
      position_update_interval: position_update_interval.unwrap_or(100),

      current_position,
      messages_since_last_position_write,
      subscriber_stream_name,
      consumer_group_member,
      consumer_group_size,
    }
  }

  pub async fn load_position(&mut self) {
    match self
      .reader
      .get_last_stream_message(&self.subscriber_stream_name)
      .await
    {
      Ok(message) => {
        self.current_position = message.data["position"].as_i64().unwrap_or(0);
      }
      Err(_) => {}
    }
  }

  pub async fn poll(&mut self, messages_per_tick: Option<i64>) -> Result<Vec<Message>, Error> {
    log::trace!("polling");

    self
      .reader
      .get_category_messages(
        &self.stream_name,
        Some(self.current_position + 1),
        messages_per_tick,
        self.origin_stream_name.as_deref(),
        self.consumer_group_member,
        self.consumer_group_size,
        None,
      )
      .await
  }

  pub async fn update_read_position(&mut self, position: i64) {
    self.current_position = position;
    self.messages_since_last_position_write += 1;

    if self.messages_since_last_position_write % self.position_update_interval == 0 {
      self.write_position(position).await;
    }
  }

  pub async fn write_position(&self, position: i64) {
    let data = json!({ "position": position });

    self
      .writer
      .write_message(
        Uuid::new_v4(),
        &self.subscriber_stream_name,
        &"Read",
        data,
        None,
        None,
      )
      .await
      .unwrap();
  }
}

pub struct Subscriber<'a> {
  reader: Reader<'a>,
  writer: Writer<'a>,
}

impl<'a> Subscriber<'a> {
  pub fn new(pool: &'a PgPool) -> Self {
    let reader = Reader::new(&pool);
    let writer = Writer::new(&pool);

    Self { reader, writer }
  }

  pub fn subscribe(
    &self,
    stream_name: String,
    subscriber_id: String,

    position_update_interval: Option<i64>,
    origin_stream_name: Option<String>,
    consumer_group_member: Option<i64>,
    consumer_group_size: Option<i64>,
  ) -> Subscription {
    Subscription::new(
      self.reader,
      self.writer,
      stream_name,
      subscriber_id,
      position_update_interval,
      origin_stream_name,
      consumer_group_member,
      consumer_group_size,
    )
  }
}