use super::types::*;
use crate::error::{Error, Result};
use crate::versioned_messages::MessageSerial;
use async_trait::async_trait;
#[async_trait]
pub trait VersionStore: Send + Sync {
async fn reserve_delivery_position(
&self,
app_id: &str,
channel: &str,
) -> Result<VersionWriteReservation>;
async fn reserve_delivery_positions(
&self,
app_id: &str,
channel: &str,
block_size: u64,
) -> Result<VersionWriteReservationBlock> {
VersionWriteReservationBlock::validate(block_size)?;
if block_size == 1 {
let reservation = self.reserve_delivery_position(app_id, channel).await?;
return Ok(VersionWriteReservationBlock {
stream_id: reservation.stream_id,
start_delivery_serial: reservation.delivery_serial,
len: 1,
});
}
Err(Error::Configuration(
"version store does not support block delivery reservations".to_string(),
))
}
async fn reserve_delivery_position_after(
&self,
app_id: &str,
channel: &str,
after_delivery_serial: u64,
) -> Result<VersionWriteReservation> {
for _ in 0..1024 {
let reservation = self.reserve_delivery_position(app_id, channel).await?;
if reservation.delivery_serial > after_delivery_serial {
return Ok(reservation);
}
}
Err(Error::Internal(format!(
"version store could not reserve delivery_serial greater than {after_delivery_serial}"
)))
}
async fn append_version(&self, record: StoredVersionRecord) -> Result<()>;
async fn get_latest(
&self,
app_id: &str,
channel: &str,
message_serial: &MessageSerial,
) -> Result<Option<StoredVersionRecord>>;
async fn get_versions(&self, request: VersionStoreReadRequest) -> Result<VersionStorePage>;
async fn replay_after(&self, request: VersionReplayRequest)
-> Result<Vec<StoredVersionRecord>>;
async fn latest_by_history(
&self,
app_id: &str,
channel: &str,
) -> Result<Vec<StoredVersionRecord>>;
async fn stream_state(&self, app_id: &str, channel: &str) -> Result<VersionStreamState>;
async fn purge_before(&self, before_ms: i64, batch_size: usize) -> Result<(u64, bool)> {
let _ = (before_ms, batch_size);
Ok((0, false))
}
}
#[derive(Default)]
pub struct NoopVersionStore;
#[async_trait]
impl VersionStore for NoopVersionStore {
async fn reserve_delivery_position(
&self,
_app_id: &str,
_channel: &str,
) -> Result<VersionWriteReservation> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn reserve_delivery_positions(
&self,
_app_id: &str,
_channel: &str,
_block_size: u64,
) -> Result<VersionWriteReservationBlock> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn append_version(&self, _record: StoredVersionRecord) -> Result<()> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn get_latest(
&self,
_app_id: &str,
_channel: &str,
_message_serial: &MessageSerial,
) -> Result<Option<StoredVersionRecord>> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn get_versions(&self, _request: VersionStoreReadRequest) -> Result<VersionStorePage> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn replay_after(
&self,
_request: VersionReplayRequest,
) -> Result<Vec<StoredVersionRecord>> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn latest_by_history(
&self,
_app_id: &str,
_channel: &str,
) -> Result<Vec<StoredVersionRecord>> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
async fn stream_state(&self, _app_id: &str, _channel: &str) -> Result<VersionStreamState> {
Err(Error::Configuration(
"Versioned message storage is not configured".to_string(),
))
}
}