use std::sync::Arc;
use super::{
ConsumerCursor, DurabilityError, DurableChannel, DurableConversation, DurableStore,
MessageEnvelope, replay_from,
};
#[cfg(test)]
mod tests;
const READ_BATCH_SIZE: usize = 1_024;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RecoveredCursor {
pub cursor: ConsumerCursor,
pub replayed_messages: Vec<MessageEnvelope>,
}
pub async fn recover_partition_sequences(
channel_id: &str,
partition_count: usize,
store: &dyn DurableStore,
) -> Result<Vec<u64>, DurabilityError> {
let mut next_sequences = Vec::with_capacity(partition_count);
for partition_index in 0..partition_count {
let stream_key = partition_stream_key(channel_id, partition_index);
next_sequences.push(recover_partition_sequence(store, &stream_key).await?);
}
Ok(next_sequences)
}
pub async fn recover_durable_channel(
channel_id: impl Into<String>,
partition_count: usize,
store: Arc<dyn DurableStore>,
) -> Result<DurableChannel, DurabilityError> {
let channel_id = channel_id.into();
let next_sequences =
recover_partition_sequences(&channel_id, partition_count, store.as_ref()).await?;
DurableChannel::from_recovered_sequences(channel_id, partition_count, store, next_sequences)
}
pub async fn recover_conversation(
conversation_id: impl Into<String>,
store: Arc<dyn DurableStore>,
) -> Result<DurableConversation, DurabilityError> {
DurableConversation::recover(conversation_id, store).await
}
pub async fn recover_cursor(
consumer_id: impl Into<String>,
partition_key: impl Into<String>,
store: &dyn DurableStore,
) -> Result<ConsumerCursor, DurabilityError> {
ConsumerCursor::resume(consumer_id, partition_key, store).await
}
pub async fn recover_cursor_with_replay(
consumer_id: impl Into<String>,
partition_key: impl Into<String>,
store: &dyn DurableStore,
) -> Result<RecoveredCursor, DurabilityError> {
let cursor = recover_cursor(consumer_id, partition_key, store).await?;
let replayed_messages =
replay_from(store, cursor.partition_key(), cursor.current_offset()).await?;
Ok(RecoveredCursor {
cursor,
replayed_messages,
})
}
async fn recover_partition_sequence(
store: &dyn DurableStore,
stream_key: &str,
) -> Result<u64, DurabilityError> {
let mut offset = 0;
let mut last_sequence: Option<u64> = None;
loop {
let batch = store.read_from(stream_key, offset, READ_BATCH_SIZE).await?;
let batch_len = batch.len();
if batch_len == 0 {
break;
}
for stored in &batch {
last_sequence = Some(last_sequence.map_or(stored.sequence, |s| s.max(stored.sequence)));
}
offset = offset.checked_add(len_to_u64(batch_len)?).ok_or_else(|| {
DurabilityError::ConfigError("channel recovery read offset overflow".to_owned())
})?;
if batch_len < READ_BATCH_SIZE {
break;
}
}
last_sequence.map_or(Ok(0), |sequence| {
sequence.checked_add(1).ok_or_else(|| {
DurabilityError::ConfigError(
"sequence number overflow after channel recovery".to_owned(),
)
})
})
}
fn partition_stream_key(channel_id: &str, partition_index: usize) -> String {
format!("{channel_id}:{partition_index}")
}
fn len_to_u64(len: usize) -> Result<u64, DurabilityError> {
u64::try_from(len).map_err(|error| {
DurabilityError::ConfigError(format!(
"channel recovery entry count cannot fit u64: {error}"
))
})
}