1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::error::Error;
use crate::message::Message;
use crate::reader::Reader;
use crate::writer::Writer;
use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;

pub struct Subscription<'a> {
  reader: Reader<'a>,
  writer: Writer<'a>,

  current_position: i64,
  messages_since_last_position_write: i64,

  subscriber_stream_name: String,
  stream_name: String,

  position_update_interval: i64,
  origin_stream_name: Option<String>,
  consumer_group_member: Option<i64>,
  consumer_group_size: Option<i64>,
}

impl<'a> Subscription<'a> {
  pub fn new(
    pool: &'a PgPool,

    stream_name: String,
    subscriber_id: String,

    position_update_interval: Option<i64>,
    origin_stream_name: Option<String>,
    consumer_group_member: Option<i64>,
    consumer_group_size: Option<i64>,
  ) -> Self {
    let reader = Reader::new(&pool);
    let writer = Writer::new(&pool);

    let current_position = 0;
    let messages_since_last_position_write = 0;
    let subscriber_stream_name = format!("subscriber-{}", subscriber_id);

    Self {
      reader,
      writer,
      stream_name,

      origin_stream_name,
      position_update_interval: position_update_interval.unwrap_or(100),

      current_position,
      messages_since_last_position_write,
      subscriber_stream_name,
      consumer_group_member,
      consumer_group_size,
    }
  }

  pub async fn load_position(&mut self) -> Result<i64, Error> {
    match self
      .reader
      .get_last_stream_message(&self.subscriber_stream_name)
      .await?
    {
      Some(message) => {
        self.current_position = message.data["position"].as_i64().unwrap_or(0);
        Ok(self.current_position)
      }
      None => Ok(0),
    }
  }

  pub async fn poll(&mut self, messages_per_tick: Option<i64>) -> Result<Vec<Message>, Error> {
    log::trace!("polling");

    let messages = self
      .reader
      .get_category_messages(
        &self.stream_name,
        Some(self.current_position + 1),
        messages_per_tick,
        self.origin_stream_name.as_deref(),
        self.consumer_group_member,
        self.consumer_group_size,
        None,
      )
      .await?;

    Ok(messages)
  }

  pub async fn update_read_position(&mut self, position: i64) -> Result<(), Error> {
    self.current_position = position;
    self.messages_since_last_position_write += 1;

    if self.messages_since_last_position_write % self.position_update_interval == 0 {
      self.write_position(position).await?;
    }

    Ok(())
  }

  pub async fn write_position(&self, position: i64) -> Result<(), Error> {
    let data = json!({ "position": position });

    self
      .writer
      .write_message(
        Uuid::new_v4(),
        &self.subscriber_stream_name,
        &"Read",
        data,
        None,
        None,
      )
      .await?;

    Ok(())
  }
}