use crate::record::LogRecord;
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tracing::instrument;
const DEFAULT_SEGMENT_CAPACITY: usize = 64 * 1024;
fn optimal_segment_capacity(total_records: usize) -> usize {
match total_records {
0..100_000 => 16 * 1024, 100_000..1_000_000 => 64 * 1024, 1_000_000..10_000_000 => 128 * 1024, _ => 256 * 1024, }
}
#[derive(Debug)]
struct Segment {
records: Vec<Arc<LogRecord>>,
frozen: bool,
}
impl Segment {
fn new(capacity: usize) -> Self {
Self {
records: Vec::with_capacity(capacity),
frozen: false,
}
}
fn from_sorted(records: Vec<Arc<LogRecord>>) -> Self {
Self {
records,
frozen: true,
}
}
fn min_timestamp(&self) -> Option<DateTime<Utc>> {
self.records.first().map(|r| r.timestamp)
}
fn max_timestamp(&self) -> Option<DateTime<Utc>> {
self.records.last().map(|r| r.timestamp)
}
fn len(&self) -> usize {
self.records.len()
}
fn is_empty(&self) -> bool {
self.records.is_empty()
}
fn insert(&mut self, record: Arc<LogRecord>) {
let pos = self
.records
.partition_point(|r| r.timestamp <= record.timestamp);
self.records.insert(pos, record);
}
fn push(&mut self, record: Arc<LogRecord>) {
self.records.push(record);
}
fn freeze(&mut self) {
self.frozen = true;
}
}
pub struct SegmentRangeIter<'a> {
slices: Vec<&'a [Arc<LogRecord>]>,
slice_idx: usize,
pos: usize,
}
impl<'a> SegmentRangeIter<'a> {
fn new(slices: Vec<&'a [Arc<LogRecord>]>) -> Self {
Self {
slices,
slice_idx: 0,
pos: 0,
}
}
}
impl<'a> Iterator for SegmentRangeIter<'a> {
type Item = &'a LogRecord;
fn next(&mut self) -> Option<Self::Item> {
while self.slice_idx < self.slices.len() {
let slice = self.slices[self.slice_idx];
if self.pos < slice.len() {
let item = &*slice[self.pos];
self.pos += 1;
return Some(item);
}
self.slice_idx += 1;
self.pos = 0;
}
None
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining: usize = self.slices[self.slice_idx..]
.iter()
.map(|s| s.len())
.sum::<usize>()
- if self.slice_idx < self.slices.len() {
self.pos
} else {
0
};
(remaining, Some(remaining))
}
}
impl<'a> ExactSizeIterator for SegmentRangeIter<'a> {}
#[derive(Debug, Clone)]
pub struct LogStoreConfig {
pub segment_capacity: Option<usize>,
pub auto_tune: bool,
}
impl Default for LogStoreConfig {
fn default() -> Self {
Self {
segment_capacity: None,
auto_tune: true,
}
}
}
#[derive(Debug)]
pub struct LogStore {
frozen: Vec<Segment>,
active: Segment,
ooo_buffer: Vec<Arc<LogRecord>>,
segment_capacity: usize,
main_len: usize,
auto_tune: bool,
user_override: bool,
}
impl LogStore {
pub fn new() -> Self {
Self {
frozen: Vec::new(),
active: Segment::new(DEFAULT_SEGMENT_CAPACITY),
ooo_buffer: Vec::new(),
segment_capacity: DEFAULT_SEGMENT_CAPACITY,
main_len: 0,
auto_tune: true,
user_override: false,
}
}
pub fn with_config(config: LogStoreConfig) -> Self {
let capacity = config.segment_capacity.unwrap_or(DEFAULT_SEGMENT_CAPACITY);
Self {
frozen: Vec::new(),
active: Segment::new(capacity),
ooo_buffer: Vec::new(),
segment_capacity: capacity,
main_len: 0,
auto_tune: config.auto_tune && config.segment_capacity.is_none(),
user_override: config.segment_capacity.is_some(),
}
}
pub fn with_capacity(_capacity: usize) -> Self {
Self::new()
}
pub fn insert(&mut self, record: LogRecord) {
let record = Arc::new(record);
if self.active.records.is_empty()
|| record.timestamp >= self.active.records.last().unwrap().timestamp
{
if self.frozen.is_empty()
|| self
.frozen
.last()
.and_then(|s| s.max_timestamp())
.is_none_or(|t| record.timestamp >= t)
{
self.active.push(record);
self.main_len += 1;
self.maybe_freeze_active();
return;
}
}
if self.active.records.is_empty()
|| self
.active
.min_timestamp()
.is_none_or(|t| record.timestamp >= t)
{
self.active.insert(record);
self.main_len += 1;
self.maybe_freeze_active();
return;
}
self.ooo_buffer.push(record);
self.maybe_compact_ooo();
}
#[instrument(skip(self, batch), fields(batch_size = batch.len()))]
pub fn insert_batch(&mut self, mut batch: Vec<LogRecord>) {
if batch.is_empty() {
return;
}
batch.sort_by_key(|r| r.timestamp);
let batch: Vec<Arc<LogRecord>> = batch.into_iter().map(Arc::new).collect();
if self.main_len == 0 && self.active.is_empty() {
self.insert_batch_empty(batch);
} else {
self.insert_batch_merge(batch);
}
self.main_len = self.frozen.iter().map(|s| s.len()).sum::<usize>() + self.active.len();
self.maybe_auto_tune();
}
fn insert_batch_empty(&mut self, batch: Vec<Arc<LogRecord>>) {
for chunk in batch.chunks(self.segment_capacity) {
let seg = Segment::from_sorted(chunk.to_vec());
self.frozen.push(seg);
}
if let Some(last) = self.frozen.last() {
if last.len() < self.segment_capacity {
let mut seg = self.frozen.pop().unwrap();
seg.frozen = false;
self.active = seg;
}
}
}
fn insert_batch_merge(&mut self, batch: Vec<Arc<LogRecord>>) {
let batch_min = batch.first().unwrap().timestamp;
let batch_max = batch.last().unwrap().timestamp;
let store_max = self
.active
.max_timestamp()
.or_else(|| self.frozen.last().and_then(|s| s.max_timestamp()));
if store_max.is_none_or(|max| batch_min >= max) {
let mut combined = Vec::with_capacity(self.active.len() + batch.len());
combined.append(&mut self.active.records);
let merged = Self::merge_sorted(combined, batch);
self.active = Segment::new(self.segment_capacity);
self.append_records_as_segments(merged);
return;
}
let first_affected = self
.frozen
.partition_point(|s| s.max_timestamp().is_some_and(|max| max < batch_min));
let last_affected = self
.frozen
.partition_point(|s| s.min_timestamp().is_some_and(|min| min <= batch_max));
let active_overlaps = self
.active
.min_timestamp()
.is_none_or(|min| min <= batch_max)
|| self
.active
.max_timestamp()
.is_none_or(|max| max >= batch_min)
|| self.active.is_empty();
let affected_count: usize = self.frozen[first_affected..last_affected]
.iter()
.map(|s| s.len())
.sum::<usize>()
+ if active_overlaps {
self.active.len()
} else {
0
}
+ batch.len();
let mut merged_records = Vec::with_capacity(affected_count);
for seg in self.frozen.drain(first_affected..last_affected) {
merged_records = Self::merge_sorted(merged_records, seg.records);
}
if active_overlaps {
let active_records = std::mem::replace(
&mut self.active.records,
Vec::with_capacity(self.segment_capacity),
);
merged_records = Self::merge_sorted(merged_records, active_records);
}
merged_records = Self::merge_sorted(merged_records, batch);
let mut new_segments = Vec::new();
for chunk in merged_records.chunks(self.segment_capacity) {
new_segments.push(Segment::from_sorted(chunk.to_vec()));
}
if active_overlaps {
self.active = Segment::new(self.segment_capacity);
if let Some(last) = new_segments.last() {
if last.len() < self.segment_capacity {
let mut seg = new_segments.pop().unwrap();
seg.frozen = false;
self.active = seg;
}
}
}
let insert_pos = first_affected;
for (i, seg) in new_segments.into_iter().enumerate() {
self.frozen.insert(insert_pos + i, seg);
}
}
fn merge_sorted(a: Vec<Arc<LogRecord>>, b: Vec<Arc<LogRecord>>) -> Vec<Arc<LogRecord>> {
if a.is_empty() {
return b;
}
if b.is_empty() {
return a;
}
let mut result = Vec::with_capacity(a.len() + b.len());
let mut ai = a.into_iter().peekable();
let mut bi = b.into_iter().peekable();
loop {
match (ai.peek(), bi.peek()) {
(Some(a_rec), Some(b_rec)) => {
if a_rec.timestamp <= b_rec.timestamp {
result.push(ai.next().unwrap());
} else {
result.push(bi.next().unwrap());
}
}
(Some(_), None) => {
result.extend(ai);
break;
}
(None, Some(_)) => {
result.extend(bi);
break;
}
(None, None) => break,
}
}
result
}
fn append_records_as_segments(&mut self, records: Vec<Arc<LogRecord>>) {
for chunk in records.chunks(self.segment_capacity) {
self.frozen.push(Segment::from_sorted(chunk.to_vec()));
}
if let Some(last) = self.frozen.last() {
if last.len() < self.segment_capacity {
let mut seg = self.frozen.pop().unwrap();
seg.frozen = false;
self.active = seg;
}
}
}
pub fn records(&self) -> Vec<LogRecord> {
let main: Vec<LogRecord> = self
.frozen
.iter()
.flat_map(|s| s.records.iter())
.chain(self.active.records.iter())
.map(|r| (**r).clone())
.collect();
if self.ooo_buffer.is_empty() {
return main;
}
let mut sorted_ooo: Vec<LogRecord> =
self.ooo_buffer.iter().map(|r| (**r).clone()).collect();
sorted_ooo.sort_by_key(|r| r.timestamp);
Self::merge_sorted(
main.into_iter().map(Arc::new).collect(),
sorted_ooo.into_iter().map(Arc::new).collect(),
)
.into_iter()
.map(|r| Arc::try_unwrap(r).unwrap_or_else(|a| (*a).clone()))
.collect()
}
pub fn iter(&self) -> impl Iterator<Item = &LogRecord> {
self.frozen
.iter()
.flat_map(|s| s.records.iter())
.chain(self.active.records.iter())
.map(|r| r.as_ref())
}
pub fn iter_arc(&self) -> impl Iterator<Item = &Arc<LogRecord>> {
self.frozen
.iter()
.flat_map(|s| s.records.iter())
.chain(self.active.records.iter())
}
pub fn iter_all(&self) -> Vec<LogRecord> {
self.records()
}
pub fn get(&self, index: usize) -> Option<&LogRecord> {
if self.ooo_buffer.is_empty() {
return self.get_main(index);
}
if index < self.main_len {
return self.get_main(index);
}
self.ooo_buffer
.get(index - self.main_len)
.map(|r| r.as_ref())
}
fn get_main(&self, index: usize) -> Option<&LogRecord> {
if index >= self.main_len {
return None;
}
let mut offset = 0;
for seg in &self.frozen {
if index < offset + seg.len() {
return Some(&seg.records[index - offset]);
}
offset += seg.len();
}
let local_idx = index - offset;
self.active.records.get(local_idx).map(|r| r.as_ref())
}
pub fn len(&self) -> usize {
self.main_len + self.ooo_buffer.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn ooo_len(&self) -> usize {
self.ooo_buffer.len()
}
pub fn find_by_timestamp(&self, ts: &DateTime<Utc>) -> usize {
let mut global_offset = 0;
for seg in &self.frozen {
if seg.max_timestamp().is_some_and(|max| max < *ts) {
global_offset += seg.len();
continue;
}
let local_pos = seg.records.partition_point(|r| r.timestamp < *ts);
return global_offset + local_pos;
}
let local_pos = self.active.records.partition_point(|r| r.timestamp < *ts);
global_offset + local_pos
}
pub fn range(&self, start: usize, end: usize) -> SegmentRangeIter<'_> {
let end = end.min(self.main_len);
let start = start.min(end);
if start == end {
return SegmentRangeIter::new(vec![]);
}
let mut slices = Vec::new();
let mut global_offset = 0;
for seg in &self.frozen {
let seg_end = global_offset + seg.len();
if start < seg_end && end > global_offset {
let local_start = start.saturating_sub(global_offset);
let local_end = (end - global_offset).min(seg.len());
slices.push(&seg.records[local_start..local_end]);
}
global_offset = seg_end;
if global_offset >= end {
return SegmentRangeIter::new(slices);
}
}
let seg_end = global_offset + self.active.len();
if start < seg_end && end > global_offset {
let local_start = start.saturating_sub(global_offset);
let local_end = (end - global_offset).min(self.active.len());
slices.push(&self.active.records[local_start..local_end]);
}
SegmentRangeIter::new(slices)
}
pub fn range_collected(&self, start: usize, end: usize) -> Vec<LogRecord> {
self.range(start, end).cloned().collect()
}
pub fn compact_ooo(&mut self) {
if self.ooo_buffer.is_empty() {
return;
}
let mut ooo = std::mem::take(&mut self.ooo_buffer);
ooo.sort_by_key(|r| r.timestamp);
let mut seg_groups: Vec<(usize, Vec<Arc<LogRecord>>)> = Vec::new();
for record in ooo {
let seg_idx = self.find_segment_for_timestamp(&record.timestamp);
if let Some(last) = seg_groups.last_mut() {
if last.0 == seg_idx {
last.1.push(record);
continue;
}
}
seg_groups.push((seg_idx, vec![record]));
}
for (seg_idx, records) in seg_groups.into_iter().rev() {
if seg_idx < self.frozen.len() {
let existing = std::mem::take(&mut self.frozen[seg_idx].records);
self.frozen[seg_idx].records = Self::merge_sorted(existing, records);
self.main_len += self.frozen[seg_idx].records.len();
if self.frozen[seg_idx].len() > self.segment_capacity * 2 {
self.split_segment(seg_idx);
}
} else {
for record in records {
self.active.insert(record);
}
}
}
self.main_len = self.frozen.iter().map(|s| s.len()).sum::<usize>() + self.active.len();
}
fn maybe_compact_ooo(&mut self) {
let threshold = self.segment_capacity / 4;
if self.ooo_buffer.len() >= threshold {
self.compact_ooo();
}
}
pub fn clear(&mut self) {
self.frozen.clear();
self.ooo_buffer.clear();
self.main_len = 0;
if self.auto_tune && !self.user_override {
self.segment_capacity = DEFAULT_SEGMENT_CAPACITY;
}
self.active = Segment::new(self.segment_capacity);
}
pub fn segment_count(&self) -> usize {
self.frozen.len() + 1
}
pub fn segment_capacity(&self) -> usize {
self.segment_capacity
}
fn maybe_auto_tune(&mut self) {
if !self.auto_tune || self.user_override {
return;
}
let optimal = optimal_segment_capacity(self.main_len);
if optimal != self.segment_capacity {
self.segment_capacity = optimal;
}
}
fn maybe_freeze_active(&mut self) {
if self.active.len() >= self.segment_capacity {
self.active.freeze();
let old = std::mem::replace(&mut self.active, Segment::new(self.segment_capacity));
self.frozen.push(old);
}
}
fn find_segment_for_timestamp(&self, ts: &DateTime<Utc>) -> usize {
self.frozen
.partition_point(|s| s.max_timestamp().is_some_and(|max| max < *ts))
}
fn split_segment(&mut self, idx: usize) {
let seg = &mut self.frozen[idx];
let mid = seg.len() / 2;
let second_half = seg.records.split_off(mid);
let new_seg = Segment::from_sorted(second_half);
self.frozen.insert(idx + 1, new_seg);
}
}
impl Default for LogStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[path = "store_tests.rs"]
mod store_tests;