#![allow(dead_code)]
use std::collections::BTreeMap;
use std::ops::Range;
use std::time::Duration;
use crate::config::SegmentConfig;
use crate::error::Result;
use crate::model::Segment;
use crate::model::SegmentId;
use crate::serde::{SegmentMeta, SegmentMetaKey};
use crate::storage::LogStorageRead as _;
use common::storage::PutOptions;
use common::{PutRecordOp, Record, StorageRead, Ttl};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct LogSegment {
id: SegmentId,
meta: SegmentMeta,
}
impl LogSegment {
pub(crate) fn new(id: SegmentId, meta: SegmentMeta) -> Self {
Self { id, meta }
}
pub fn id(&self) -> SegmentId {
self.id
}
pub fn meta(&self) -> &SegmentMeta {
&self.meta
}
}
impl From<LogSegment> for Segment {
fn from(seg: LogSegment) -> Self {
Segment {
id: seg.id,
start_seq: seg.meta.start_seq,
start_time_ms: seg.meta.start_time_ms,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct SegmentAssignment {
pub(crate) segment: LogSegment,
pub(crate) is_new: bool,
}
#[derive(Clone)]
pub(crate) struct SegmentCache {
segments: BTreeMap<u64, LogSegment>,
config: SegmentConfig,
}
impl SegmentCache {
pub(crate) async fn open(storage: &dyn StorageRead, config: SegmentConfig) -> Result<Self> {
let loaded = storage.scan_segments(0..u32::MAX).await?;
let mut segments = BTreeMap::new();
for segment in loaded {
segments.insert(segment.meta.start_seq, segment);
}
Ok(Self { segments, config })
}
#[cfg(test)]
fn new() -> Self {
Self {
segments: BTreeMap::new(),
config: SegmentConfig::default(),
}
}
pub(crate) fn latest(&self) -> Option<LogSegment> {
self.segments.values().next_back().cloned()
}
pub(crate) fn all(&self) -> Vec<LogSegment> {
self.segments.values().cloned().collect()
}
pub(crate) fn find_covering(&self, range: &Range<u64>) -> Vec<LogSegment> {
if range.start >= range.end {
return Vec::new();
}
let mut result = Vec::new();
let mut iter = self.segments.range(..=range.start).rev();
if let Some((_, first_seg)) = iter.next() {
result.push(first_seg.clone());
}
for (_, seg) in self
.segments
.range(range.start.saturating_add(1)..range.end)
{
result.push(seg.clone());
}
result
}
pub(crate) fn insert(&mut self, segment: LogSegment) {
self.segments.insert(segment.meta.start_seq, segment);
}
pub(crate) fn replace_all(&mut self, segments: &[LogSegment]) {
self.segments.clear();
for segment in segments {
self.segments
.insert(segment.meta.start_seq, segment.clone());
}
}
pub(crate) async fn refresh(
&mut self,
storage: &dyn StorageRead,
after_segment_id: Option<SegmentId>,
) -> Result<()> {
let loaded = match after_segment_id {
Some(id) => {
storage
.scan_segments(id.saturating_add(1)..u32::MAX)
.await?
}
None => storage.scan_segments(0..u32::MAX).await?,
};
if after_segment_id.is_none() {
self.segments.clear();
}
for segment in loaded {
self.segments.insert(segment.meta.start_seq, segment);
}
Ok(())
}
pub(crate) fn assign_segment(
&mut self,
current_time_ms: i64,
start_seq: u64,
records: &mut Vec<PutRecordOp>,
force_seal: bool,
) -> SegmentAssignment {
let latest = self.latest();
let needs_new_segment = force_seal
|| Self::should_roll(self.config.seal_interval, current_time_ms, latest.as_ref());
if needs_new_segment {
let segment_id = latest.map(|s| s.id + 1).unwrap_or(0);
let meta = SegmentMeta::new(start_seq, current_time_ms);
let key = SegmentMetaKey::new(segment_id).serialize();
let value = meta.serialize();
records.push(PutRecordOp::new_with_options(
Record::new(key, value),
PutOptions { ttl: Ttl::NoExpiry },
));
let segment = LogSegment::new(segment_id, meta);
self.insert(segment.clone());
SegmentAssignment {
segment,
is_new: true,
}
} else {
SegmentAssignment {
segment: latest.unwrap(),
is_new: false,
}
}
}
fn should_roll(
seal_interval: Option<Duration>,
current_time_ms: i64,
latest: Option<&LogSegment>,
) -> bool {
let Some(latest) = latest else {
return true;
};
let Some(seal_interval) = seal_interval else {
return false;
};
let seal_interval_ms = seal_interval.as_millis() as i64;
let segment_age_ms = current_time_ms - latest.meta().start_time_ms;
segment_age_ms >= seal_interval_ms
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::LogStorageWrite;
use common::Storage;
use opendata_macros::storage_test;
async fn write_segment(
storage: &dyn common::Storage,
cache: &mut SegmentCache,
meta: SegmentMeta,
) -> LogSegment {
let segment_id = match cache.latest() {
Some(latest) => latest.id + 1,
None => 0,
};
let segment = LogSegment::new(segment_id, meta);
storage.write_segment(&segment).await.unwrap();
cache.insert(segment.clone());
segment
}
#[storage_test]
async fn should_return_none_when_no_segments_exist(storage: Arc<dyn Storage>) {
let cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let latest = cache.latest();
assert!(latest.is_none());
}
#[storage_test]
async fn should_write_first_segment_with_id_zero(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let meta = SegmentMeta::new(0, 1000);
let segment = write_segment(storage.as_ref(), &mut cache, meta.clone()).await;
assert_eq!(segment.id(), 0);
assert_eq!(segment.meta(), &meta);
}
#[storage_test]
async fn should_increment_segment_id_on_subsequent_writes(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let seg0 = write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
let seg1 = write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
let seg2 = write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
assert_eq!(seg0.id(), 0);
assert_eq!(seg1.id(), 1);
assert_eq!(seg2.id(), 2);
}
#[storage_test]
async fn should_return_latest_segment(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
let latest = cache.latest();
assert_eq!(latest.unwrap().id(), 1);
}
#[storage_test]
async fn should_scan_all_segments(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.all();
assert_eq!(segments.len(), 3);
assert_eq!(segments[0].id(), 0);
assert_eq!(segments[1].id(), 1);
assert_eq!(segments[2].id(), 2);
}
#[storage_test]
async fn should_persist_segments_to_storage(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
let cache2 = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let segments = cache2.all();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].id(), 0);
assert_eq!(segments[0].meta().start_seq, 0);
assert_eq!(segments[1].id(), 1);
assert_eq!(segments[1].meta().start_seq, 100);
}
#[storage_test]
async fn should_find_segments_by_seq_range_all(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(0..u64::MAX));
assert_eq!(segments.len(), 3);
}
#[storage_test]
async fn should_find_segments_by_seq_range_single(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(50..60));
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id(), 0);
}
#[storage_test]
async fn should_find_segments_by_seq_range_spanning(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(50..150));
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].id(), 0);
assert_eq!(segments[1].id(), 1);
}
#[storage_test]
async fn should_find_segments_by_seq_range_unbounded_end(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(150..u64::MAX));
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].id(), 1);
assert_eq!(segments[1].id(), 2);
}
#[storage_test]
async fn should_find_no_segments_when_range_before_all(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 1000)).await;
let segments = cache.find_covering(&(0..50));
assert_eq!(segments.len(), 0);
}
#[storage_test]
async fn should_find_no_segments_when_storage_empty(storage: Arc<dyn Storage>) {
let cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let segments = cache.find_covering(&(0..u64::MAX));
assert_eq!(segments.len(), 0);
}
#[storage_test]
async fn should_find_last_segment_when_range_after_all(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(500..600));
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id(), 2);
}
#[storage_test]
async fn should_find_segment_when_query_starts_at_boundary(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(100..150));
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id(), 1);
}
#[storage_test]
async fn should_find_segments_with_unbounded_start(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(100, 2000)).await;
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(200, 3000)).await;
let segments = cache.find_covering(&(0..150));
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].id(), 0);
assert_eq!(segments[1].id(), 1);
}
#[tokio::test]
async fn should_roll_returns_false_when_no_seal_interval() {
let should_roll = SegmentCache::should_roll(None, 1000, None);
assert!(should_roll);
}
#[tokio::test]
async fn should_roll_returns_true_when_no_segments_exist() {
let seal_interval = Some(Duration::from_secs(3600));
let should_roll = SegmentCache::should_roll(seal_interval, 1000, None);
assert!(should_roll);
}
#[tokio::test]
async fn should_roll_returns_false_when_within_interval() {
let seal_interval = Some(Duration::from_secs(3600));
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
let current_time_ms = 1000 + 30 * 60 * 1000;
let should_roll = SegmentCache::should_roll(seal_interval, current_time_ms, Some(&segment));
assert!(!should_roll);
}
#[tokio::test]
async fn should_roll_returns_true_when_interval_exceeded() {
let seal_interval = Some(Duration::from_secs(3600));
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
let current_time_ms = 1000 + 2 * 60 * 60 * 1000;
let should_roll = SegmentCache::should_roll(seal_interval, current_time_ms, Some(&segment));
assert!(should_roll);
}
#[tokio::test]
async fn should_roll_returns_true_when_exactly_at_interval() {
let seal_interval = Some(Duration::from_secs(3600));
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
let current_time_ms = 1000 + 60 * 60 * 1000;
let should_roll = SegmentCache::should_roll(seal_interval, current_time_ms, Some(&segment));
assert!(should_roll);
}
#[tokio::test]
async fn should_roll_returns_false_without_seal_interval_when_segment_exists() {
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
let should_roll = SegmentCache::should_roll(None, 999999999, Some(&segment));
assert!(!should_roll);
}
#[storage_test]
async fn assign_segment_creates_first_segment_when_none_exist(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let mut records = Vec::new();
let assignment = cache.assign_segment(1000, 0, &mut records, false);
assert!(assignment.is_new);
assert_eq!(assignment.segment.id(), 0);
assert_eq!(assignment.segment.meta().start_seq, 0);
assert_eq!(assignment.segment.meta().start_time_ms, 1000);
assert_eq!(records.len(), 1); assert_eq!(cache.latest().unwrap().id(), 0);
}
#[storage_test]
async fn assign_segment_returns_existing_segment_when_within_interval(
storage: Arc<dyn Storage>,
) {
let config = SegmentConfig {
seal_interval: Some(Duration::from_secs(3600)),
};
let mut cache = SegmentCache::open(storage.as_ref(), config).await.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
let mut records = Vec::new();
let current_time_ms = 1000 + 30 * 60 * 1000;
let assignment = cache.assign_segment(current_time_ms, 100, &mut records, false);
assert!(!assignment.is_new);
assert_eq!(assignment.segment.id(), 0);
assert_eq!(records.len(), 0);
assert_eq!(cache.all().len(), 1);
}
#[storage_test]
async fn assign_segment_creates_new_segment_when_interval_exceeded(storage: Arc<dyn Storage>) {
let config = SegmentConfig {
seal_interval: Some(Duration::from_secs(3600)),
};
let mut cache = SegmentCache::open(storage.as_ref(), config).await.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
let mut records = Vec::new();
let current_time_ms = 1000 + 2 * 60 * 60 * 1000;
let assignment = cache.assign_segment(current_time_ms, 100, &mut records, false);
assert!(assignment.is_new);
assert_eq!(assignment.segment.id(), 1);
assert_eq!(assignment.segment.meta().start_seq, 100);
assert_eq!(records.len(), 1);
assert_eq!(cache.all().len(), 2);
}
#[storage_test]
async fn assign_segment_force_seal_creates_new_segment(storage: Arc<dyn Storage>) {
let config = SegmentConfig {
seal_interval: Some(Duration::from_secs(3600)),
};
let mut cache = SegmentCache::open(storage.as_ref(), config).await.unwrap();
write_segment(storage.as_ref(), &mut cache, SegmentMeta::new(0, 1000)).await;
let mut records = Vec::new();
let current_time_ms = 1000 + 30 * 60 * 1000;
let assignment = cache.assign_segment(current_time_ms, 100, &mut records, true);
assert!(assignment.is_new);
assert_eq!(assignment.segment.id(), 1);
assert_eq!(cache.all().len(), 2);
}
#[storage_test]
async fn assign_segment_creates_correct_segment_meta_record(storage: Arc<dyn Storage>) {
let mut cache = SegmentCache::open(storage.as_ref(), SegmentConfig::default())
.await
.unwrap();
let mut records = Vec::new();
let assignment = cache.assign_segment(5000, 42, &mut records, false);
assert_eq!(records.len(), 1);
let key = SegmentMetaKey::deserialize(&records[0].record.key).unwrap();
let meta = SegmentMeta::deserialize(&records[0].record.value).unwrap();
assert_eq!(key.segment_id, assignment.segment.id());
assert_eq!(meta.start_seq, 42);
assert_eq!(meta.start_time_ms, 5000);
assert_eq!(records[0].options, PutOptions { ttl: Ttl::NoExpiry })
}
}