use std::sync::atomic::{AtomicU64, Ordering};
use super::LSN;
use super::ring_buffer::{
CompletionHandle, DEFAULT_RING_BUFFER_CAPACITY, PendingEntry, WalRingBuffer,
};
pub struct WalStripe {
id: usize,
ring_buffer: WalRingBuffer,
append_count: AtomicU64,
bytes_appended: AtomicU64,
}
impl WalStripe {
pub fn new(id: usize) -> Self {
Self::with_capacity(id, DEFAULT_RING_BUFFER_CAPACITY)
}
pub fn with_capacity(id: usize, capacity: usize) -> Self {
Self {
id,
ring_buffer: WalRingBuffer::new(capacity),
append_count: AtomicU64::new(0),
bytes_appended: AtomicU64::new(0),
}
}
#[inline]
pub fn id(&self) -> usize {
self.id
}
pub fn append_async(&self, lsn: LSN, data: Vec<u8>) -> Result<(), PendingEntry> {
let entry = PendingEntry::new_async(lsn, data);
self.append_entry(entry)
}
pub fn append_sync(&self, lsn: LSN, data: Vec<u8>) -> Result<CompletionHandle, PendingEntry> {
let (entry, handle) = PendingEntry::new_sync(lsn, data);
self.append_entry(entry).map(|()| handle)
}
pub fn append_sync_blocking(
&self,
lsn: LSN,
data: Vec<u8>,
) -> Result<CompletionHandle, PendingEntry> {
let (entry, handle) = PendingEntry::new_sync(lsn, data);
self.append_entry_blocking(entry).map(|()| handle)
}
pub fn append_blocking(&self, lsn: LSN, data: Vec<u8>) -> Result<(), PendingEntry> {
let entry = PendingEntry::new_async(lsn, data);
self.append_entry_blocking(entry)
}
fn append_entry(&self, entry: PendingEntry) -> Result<(), PendingEntry> {
let data_len = entry.data.len();
match self.ring_buffer.try_append(entry) {
Ok(()) => {
self.append_count.fetch_add(1, Ordering::Relaxed);
self.bytes_appended
.fetch_add(data_len as u64, Ordering::Relaxed);
Ok(())
}
Err(e) => Err(e),
}
}
fn append_entry_blocking(&self, entry: PendingEntry) -> Result<(), PendingEntry> {
let data_len = entry.data.len();
match self.ring_buffer.append_blocking(entry) {
Ok(()) => {
self.append_count.fetch_add(1, Ordering::Relaxed);
self.bytes_appended
.fetch_add(data_len as u64, Ordering::Relaxed);
Ok(())
}
Err(e) => Err(e),
}
}
pub fn drain(&self) -> Vec<PendingEntry> {
self.ring_buffer.drain()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.ring_buffer.is_empty_approx()
}
#[inline]
pub fn pending_count(&self) -> usize {
self.ring_buffer.len_approx()
}
#[inline]
pub fn total_appends(&self) -> u64 {
self.append_count.load(Ordering::Relaxed)
}
#[inline]
pub fn total_bytes(&self) -> u64 {
self.bytes_appended.load(Ordering::Relaxed)
}
pub fn close(&self) {
self.ring_buffer.close();
}
#[inline]
pub fn is_closed(&self) -> bool {
self.ring_buffer.is_closed()
}
}
#[derive(Debug, Clone)]
pub struct StripeMetrics {
pub id: usize,
pub total_appends: u64,
pub total_bytes: u64,
pub pending_count: usize,
}
impl WalStripe {
pub fn metrics(&self) -> StripeMetrics {
StripeMetrics {
id: self.id,
total_appends: self.total_appends(),
total_bytes: self.total_bytes(),
pending_count: self.pending_count(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_stripe_creation() {
let stripe = WalStripe::new(0);
assert_eq!(stripe.id(), 0);
assert!(stripe.is_empty());
assert!(!stripe.is_closed());
}
#[test]
fn test_stripe_with_custom_capacity() {
let stripe = WalStripe::with_capacity(5, 64);
assert_eq!(stripe.id(), 5);
}
#[test]
fn test_stripe_append_async() {
let stripe = WalStripe::new(0);
let result = stripe.append_async(LSN(1), vec![1, 2, 3]);
assert!(result.is_ok());
assert_eq!(stripe.total_appends(), 1);
assert_eq!(stripe.total_bytes(), 3);
assert_eq!(stripe.pending_count(), 1);
}
#[test]
fn test_stripe_append_sync() {
let stripe = WalStripe::new(0);
let result = stripe.append_sync(LSN(1), vec![1, 2, 3, 4]);
assert!(result.is_ok());
let handle = result.unwrap();
assert!(!handle.is_complete());
assert_eq!(stripe.total_appends(), 1);
assert_eq!(stripe.total_bytes(), 4);
}
#[test]
fn test_stripe_drain() {
let stripe = WalStripe::new(0);
for i in 0..5 {
stripe.append_async(LSN(i), vec![i as u8]).unwrap();
}
assert_eq!(stripe.pending_count(), 5);
let entries = stripe.drain();
assert_eq!(entries.len(), 5);
assert!(stripe.is_empty());
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.lsn, LSN(i as u64));
}
}
#[test]
fn test_stripe_close() {
let stripe = WalStripe::new(0);
stripe.close();
assert!(stripe.is_closed());
let result = stripe.append_async(LSN(1), vec![]);
assert!(result.is_err());
}
#[test]
fn test_stripe_metrics() {
let stripe = WalStripe::new(42);
stripe.append_async(LSN(1), vec![1, 2, 3]).unwrap();
stripe.append_async(LSN(2), vec![4, 5]).unwrap();
let metrics = stripe.metrics();
assert_eq!(metrics.id, 42);
assert_eq!(metrics.total_appends, 2);
assert_eq!(metrics.total_bytes, 5);
assert_eq!(metrics.pending_count, 2);
}
#[test]
fn test_stripe_concurrent_appends() {
let stripe = Arc::new(WalStripe::with_capacity(0, 1024));
let num_threads = 4;
let appends_per_thread = 100;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let stripe = Arc::clone(&stripe);
thread::spawn(move || {
for i in 0..appends_per_thread {
let lsn = LSN((t * appends_per_thread + i) as u64);
stripe.append_async(lsn, vec![t as u8]).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(
stripe.total_appends(),
(num_threads * appends_per_thread) as u64
);
}
#[test]
fn test_stripe_sync_completion() {
let stripe = WalStripe::new(0);
let handle = stripe.append_sync(LSN(1), vec![1]).unwrap();
let entries = stripe.drain();
assert_eq!(entries.len(), 1);
entries[0].notify_completion();
assert!(handle.is_complete());
assert!(handle.wait().is_ok());
}
#[test]
fn test_stripe_sync_error_notification() {
let stripe = WalStripe::new(0);
let handle = stripe.append_sync(LSN(1), vec![1]).unwrap();
let entries = stripe.drain();
entries[0].notify_error("test error");
assert!(handle.is_complete());
let result = handle.wait();
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "test error");
}
#[test]
fn test_stripe_interleaved_append_drain() {
let stripe = WalStripe::new(0);
for cycle in 0..5 {
for i in 0..3 {
stripe
.append_async(LSN((cycle * 3 + i) as u64), vec![])
.unwrap();
}
let entries = stripe.drain();
assert_eq!(entries.len(), 3);
assert_eq!(stripe.total_appends(), ((cycle + 1) * 3) as u64);
}
}
#[test]
fn test_stripe_metrics_on_failure() {
let stripe = WalStripe::with_capacity(0, 2);
let mut successful_appends = 0;
for i in 0..10 {
let payload = vec![i as u8];
match stripe.append_async(LSN(i), payload) {
Ok(_) => successful_appends += 1,
Err(_) => {
}
}
}
assert!(
successful_appends < 10,
"Expected some appends to fail due to full buffer"
);
assert_eq!(stripe.total_appends(), successful_appends);
}
#[test]
fn test_stripe_metrics_on_closed_failure() {
let stripe = Arc::new(WalStripe::with_capacity(1, 10));
stripe.append_async(LSN(1), vec![1]).unwrap();
stripe.close();
let result = stripe.append_blocking(LSN(2), vec![2]);
assert!(result.is_err());
assert_eq!(stripe.total_appends(), 1);
assert_eq!(stripe.total_bytes(), 1);
}
}