1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
use crate::error::Error; use crate::message::Message; use crate::reader::Reader; use crate::writer::Writer; use serde_json::json; use sqlx::PgPool; use uuid::Uuid; pub struct Subscription<'a> { reader: Reader<'a>, writer: Writer<'a>, current_position: i64, messages_since_last_position_write: i64, subscriber_stream_name: String, stream_name: String, position_update_interval: i64, origin_stream_name: Option<String>, consumer_group_member: Option<i64>, consumer_group_size: Option<i64>, } impl<'a> Subscription<'a> { pub fn new( pool: &'a PgPool, stream_name: String, subscriber_id: String, position_update_interval: Option<i64>, origin_stream_name: Option<String>, consumer_group_member: Option<i64>, consumer_group_size: Option<i64>, ) -> Self { let reader = Reader::new(&pool); let writer = Writer::new(&pool); let current_position = 0; let messages_since_last_position_write = 0; let subscriber_stream_name = format!("subscriber-{}", subscriber_id); Self { reader, writer, stream_name, origin_stream_name, position_update_interval: position_update_interval.unwrap_or(100), current_position, messages_since_last_position_write, subscriber_stream_name, consumer_group_member, consumer_group_size, } } pub async fn load_position(&mut self) -> Result<i64, Error> { match self .reader .get_last_stream_message(&self.subscriber_stream_name) .await? { Some(message) => { self.current_position = message.data["position"].as_i64().unwrap_or(0); Ok(self.current_position) } None => Ok(0), } } pub async fn poll(&mut self, messages_per_tick: Option<i64>) -> Result<Vec<Message>, Error> { log::trace!("polling"); let messages = self .reader .get_category_messages( &self.stream_name, Some(self.current_position + 1), messages_per_tick, self.origin_stream_name.as_deref(), self.consumer_group_member, self.consumer_group_size, None, ) .await?; Ok(messages) } pub async fn update_read_position(&mut self, position: i64) -> Result<(), Error> { self.current_position = position; self.messages_since_last_position_write += 1; if self.messages_since_last_position_write % self.position_update_interval == 0 { self.write_position(position).await?; } Ok(()) } pub async fn write_position(&self, position: i64) -> Result<(), Error> { let data = json!({ "position": position }); self .writer .write_message( Uuid::new_v4(), &self.subscriber_stream_name, &"Read", data, None, None, ) .await?; Ok(()) } }