Skip to main content

rivven_core/
partition.rs

1use crate::metrics::{CoreMetrics, Timer};
2use crate::storage::{LogManager, TieredStorage};
3use crate::{Config, Message, Result};
4use bytes::Bytes;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, info, warn};
9
10/// A single partition within a topic
11#[derive(Debug)]
12pub struct Partition {
13    /// Topic name (for tiered storage)
14    topic: String,
15
16    /// Partition ID
17    id: u32,
18
19    /// Storage Manager
20    log_manager: Arc<RwLock<LogManager>>,
21
22    /// Tiered storage (optional, for hot/warm/cold data tiering)
23    tiered_storage: Option<Arc<TieredStorage>>,
24
25    /// Current offset (next offset to be assigned)
26    /// Lock-free atomic for 5-10x throughput improvement
27    next_offset: AtomicU64,
28
29    /// Low watermark: records before this offset are logically deleted.
30    /// Set via `set_low_watermark()` (e.g., from DeleteRecords API).
31    low_watermark: AtomicU64,
32}
33
34impl Partition {
35    /// Create a new partition
36    pub async fn new(config: &Config, topic: &str, id: u32) -> Result<Self> {
37        Self::new_with_tiered_storage(config, topic, id, None).await
38    }
39
40    /// Create a new partition with optional tiered storage
41    pub async fn new_with_tiered_storage(
42        config: &Config,
43        topic: &str,
44        id: u32,
45        tiered_storage: Option<Arc<TieredStorage>>,
46    ) -> Result<Self> {
47        info!(
48            "Creating partition {} for topic {} (tiered_storage: {})",
49            id,
50            topic,
51            tiered_storage.is_some()
52        );
53        let base_dir = std::path::PathBuf::from(&config.data_dir);
54        let log_manager = LogManager::new(base_dir, topic, id, config.max_segment_size).await?;
55
56        // Recover offset from storage
57        let recovered_offset = log_manager.recover_next_offset().await?;
58        let next_offset = AtomicU64::new(recovered_offset);
59
60        Ok(Self {
61            topic: topic.to_string(),
62            id,
63            log_manager: Arc::new(RwLock::new(log_manager)),
64            tiered_storage,
65            next_offset,
66            low_watermark: AtomicU64::new(0),
67        })
68    }
69
70    /// Get the partition ID
71    pub fn id(&self) -> u32 {
72        self.id
73    }
74
75    /// Get the topic name
76    pub fn topic(&self) -> &str {
77        &self.topic
78    }
79
80    /// Check if tiered storage is enabled
81    pub fn has_tiered_storage(&self) -> bool {
82        self.tiered_storage.is_some()
83    }
84
85    /// Append a message to the partition
86    /// Lock-free implementation using AtomicU64 for 5-10x throughput
87    pub async fn append(&self, mut message: Message) -> Result<u64> {
88        let timer = Timer::new();
89
90        // Acquire write lock first, then allocate offset to avoid gaps on failure
91        let mut log = self.log_manager.write().await;
92
93        // Lock-free offset allocation - single atomic operation
94        // Using AcqRel: ensures our write is visible to other threads (Release)
95        // and we see all previous writes (Acquire). SeqCst is unnecessary here.
96        let offset = self.next_offset.fetch_add(1, Ordering::AcqRel);
97
98        message.offset = offset;
99
100        // Pre-serialize for tiered storage BEFORE consuming the message,
101        // avoiding a full clone. Only serialize if tiered storage is enabled.
102        let tiered_bytes = if self.tiered_storage.is_some() {
103            match message.to_bytes() {
104                Ok(data) => Some(Bytes::from(data)),
105                Err(e) => {
106                    warn!("Failed to serialize for tiered storage: {} (continuing)", e);
107                    None
108                }
109            }
110        } else {
111            None
112        };
113
114        // Write to log manager (primary storage) — consumes message, no clone needed
115        if let Err(e) = log.append(offset, message).await {
116            // Reclaim the offset on failure to prevent gaps
117            let _ = self.next_offset.compare_exchange(
118                offset + 1,
119                offset,
120                Ordering::AcqRel,
121                Ordering::Relaxed,
122            );
123            return Err(e);
124        }
125        drop(log);
126
127        // Write pre-serialized bytes to tiered storage (no second serialization)
128        if let (Some(tiered), Some(data)) = (&self.tiered_storage, tiered_bytes) {
129            if let Err(e) = tiered
130                .write(&self.topic, self.id, offset, offset + 1, data)
131                .await
132            {
133                // Log warning but don't fail - log manager has the authoritative copy
134                warn!(
135                    "Failed to write to tiered storage: {} (data safe in log)",
136                    e
137                );
138            }
139        }
140
141        // Record metrics
142        CoreMetrics::increment_messages_appended();
143        CoreMetrics::record_append_latency_us(timer.elapsed_us());
144
145        debug!(
146            "Appended message at offset {} to partition {}",
147            offset, self.id
148        );
149
150        Ok(offset)
151    }
152
153    /// Read messages from a given offset
154    pub async fn read(&self, start_offset: u64, max_messages: usize) -> Result<Vec<Message>> {
155        let timer = Timer::new();
156
157        // Respect low watermark — don't serve records before it
158        let wm = self.low_watermark.load(Ordering::Acquire);
159        let effective_offset = start_offset.max(wm);
160
161        let log = self.log_manager.read().await;
162        // Estimate size: 4KB per message to be safe/generous for the 'max_bytes' parameter of log.read
163        let messages = log.read(effective_offset, max_messages * 4096).await?;
164
165        let result: Vec<Message> = messages.into_iter().take(max_messages).collect();
166
167        // Record metrics
168        CoreMetrics::add_messages_read(result.len() as u64);
169        CoreMetrics::record_read_latency_us(timer.elapsed_us());
170
171        debug!(
172            "Read {} messages from partition {} starting at offset {}",
173            result.len(),
174            self.id,
175            start_offset
176        );
177
178        Ok(result)
179    }
180
181    /// Get the latest offset
182    pub async fn latest_offset(&self) -> u64 {
183        self.next_offset.load(Ordering::Acquire)
184    }
185
186    pub async fn earliest_offset(&self) -> Option<u64> {
187        let log_earliest = {
188            let log = self.log_manager.read().await;
189            log.earliest_offset()
190        };
191        let wm = self.low_watermark.load(Ordering::Acquire);
192        Some(log_earliest.max(wm))
193    }
194
195    /// Set the low watermark for this partition.
196    ///
197    /// Records before this offset are logically deleted and will not be
198    /// returned by `read()`. This implements the DeleteRecords API behavior.
199    /// The watermark can only advance forward (monotonically increasing).
200    ///
201    /// Also triggers physical segment truncation: segments whose data is
202    /// entirely below the watermark are deleted from disk to reclaim space.
203    pub async fn set_low_watermark(&self, offset: u64) {
204        self.low_watermark.fetch_max(offset, Ordering::Release);
205
206        // Physically remove segments that are entirely below the watermark.
207        // This reclaims disk space — the logical watermark alone only hides
208        // records from consumers.
209        let mut log = self.log_manager.write().await;
210        match log.truncate_before(offset) {
211            Ok(0) => {}
212            Ok(n) => {
213                info!(
214                    "Partition {}/{}: truncated {} segment(s) below watermark {}",
215                    self.topic, self.id, n, offset
216                );
217            }
218            Err(e) => {
219                warn!(
220                    "Partition {}/{}: segment truncation failed: {}",
221                    self.topic, self.id, e
222                );
223            }
224        }
225    }
226
227    /// Get the current low watermark
228    pub fn low_watermark(&self) -> u64 {
229        self.low_watermark.load(Ordering::Acquire)
230    }
231
232    pub async fn message_count(&self) -> usize {
233        let earliest = self.earliest_offset().await.unwrap_or(0);
234        let next = self.next_offset.load(Ordering::SeqCst);
235        (next.saturating_sub(earliest)) as usize
236    }
237
238    /// Batch append multiple messages for 20-50x throughput improvement
239    /// Single fsync per batch instead of per-message
240    pub async fn append_batch(&self, messages: Vec<Message>) -> Result<Vec<u64>> {
241        if messages.is_empty() {
242            return Ok(Vec::new());
243        }
244
245        let timer = Timer::new();
246        let batch_size = messages.len();
247
248        // Allocate offsets atomically for entire batch
249        let start_offset = self
250            .next_offset
251            .fetch_add(batch_size as u64, Ordering::SeqCst);
252
253        let mut offsets = Vec::with_capacity(batch_size);
254        let mut batch_messages = Vec::with_capacity(batch_size);
255        let mut batch_data = Vec::new();
256
257        // Prepare messages with offsets
258        for (i, mut message) in messages.into_iter().enumerate() {
259            let offset = start_offset + i as u64;
260            message.offset = offset;
261
262            // Collect data for tiered storage
263            if self.tiered_storage.is_some() {
264                if let Ok(data) = message.to_bytes() {
265                    batch_data.extend_from_slice(&data);
266                }
267            }
268
269            batch_messages.push((offset, message));
270            offsets.push(offset);
271        }
272
273        // Write to log manager using optimized batch append
274        {
275            let mut log = self.log_manager.write().await;
276            log.append_batch(batch_messages).await?;
277        }
278
279        // Also write to tiered storage if enabled
280        if let Some(tiered) = &self.tiered_storage {
281            if !batch_data.is_empty() {
282                let end_offset = start_offset + batch_size as u64;
283                if let Err(e) = tiered
284                    .write(
285                        &self.topic,
286                        self.id,
287                        start_offset,
288                        end_offset,
289                        Bytes::from(batch_data),
290                    )
291                    .await
292                {
293                    // Log warning but don't fail - log manager has the authoritative copy
294                    warn!(
295                        "Failed to write batch to tiered storage: {} (data safe in log)",
296                        e
297                    );
298                }
299            }
300        }
301
302        // Record metrics
303        CoreMetrics::increment_batch_appends();
304        CoreMetrics::add_messages_appended(batch_size as u64);
305        CoreMetrics::record_batch_append_latency_us(timer.elapsed_us());
306
307        debug!(
308            "Batch appended {} messages to partition {} (offsets {}-{})",
309            batch_size,
310            self.id,
311            start_offset,
312            start_offset + batch_size as u64 - 1
313        );
314
315        Ok(offsets)
316    }
317
318    /// Flush partition data to disk ensuring durability
319    pub async fn flush(&self) -> Result<()> {
320        let log = self.log_manager.read().await;
321        log.flush().await?;
322
323        // Also flush tiered storage hot tier if enabled
324        if let Some(tiered) = &self.tiered_storage {
325            tiered.flush_hot_tier(&self.topic, self.id).await?;
326        }
327
328        Ok(())
329    }
330
331    /// Find the first offset with timestamp >= target_timestamp (milliseconds since epoch)
332    /// Returns None if no matching offset is found.
333    pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
334        let log = self.log_manager.read().await;
335        log.find_offset_for_timestamp(target_timestamp).await
336    }
337
338    /// Get tiered storage statistics for this partition
339    pub fn tiered_storage_stats(&self) -> Option<crate::storage::TieredStorageStatsSnapshot> {
340        self.tiered_storage.as_ref().map(|ts| ts.stats())
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::Config;
348    use bytes::Bytes;
349    use std::fs;
350
351    fn get_test_config() -> Config {
352        let config = Config {
353            data_dir: format!("/tmp/rivven-test-partition-{}", uuid::Uuid::new_v4()),
354            ..Default::default()
355        };
356        let _ = fs::remove_dir_all(&config.data_dir);
357        config
358    }
359
360    #[tokio::test]
361    async fn test_partition_persistence() {
362        let config = get_test_config();
363        let topic = "test-topic";
364        let part_id = 0;
365
366        // 1. Create partition and write messages
367        {
368            let partition = Partition::new(&config, topic, part_id).await.unwrap();
369
370            partition
371                .append(Message::new(Bytes::from("msg1")))
372                .await
373                .unwrap();
374            partition
375                .append(Message::new(Bytes::from("msg2")))
376                .await
377                .unwrap();
378
379            let stored = partition.read(0, 10).await.unwrap();
380            assert_eq!(stored.len(), 2);
381            assert_eq!(stored[0].value, Bytes::from("msg1"));
382            assert_eq!(stored[1].value, Bytes::from("msg2"));
383        }
384
385        // 2. Re-open partition to test persistence and recovery
386        {
387            let partition = Partition::new(&config, topic, part_id).await.unwrap();
388
389            // Check next offset
390            assert_eq!(partition.latest_offset().await, 2);
391
392            // Read old messages
393            let stored = partition.read(0, 10).await.unwrap();
394            assert_eq!(stored.len(), 2);
395            assert_eq!(stored[0].value, Bytes::from("msg1"));
396
397            // Append new message
398            partition
399                .append(Message::new(Bytes::from("msg3")))
400                .await
401                .unwrap();
402            let stored = partition.read(0, 10).await.unwrap();
403            assert_eq!(stored.len(), 3);
404            assert_eq!(stored[2].value, Bytes::from("msg3"));
405        }
406
407        fs::remove_dir_all(&config.data_dir).unwrap();
408    }
409
410    #[tokio::test]
411    async fn test_find_offset_for_timestamp() {
412        let config = get_test_config();
413        let topic = "test-topic-ts";
414        let part_id = 0;
415
416        let partition = Partition::new(&config, topic, part_id).await.unwrap();
417
418        // Append some messages
419        for i in 0..5 {
420            let msg = Message::new(Bytes::from(format!("msg{}", i)));
421            partition.append(msg).await.unwrap();
422            // Small delay to ensure distinct timestamps
423            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
424        }
425
426        // Read messages to get their timestamps
427        let messages = partition.read(0, 10).await.unwrap();
428        assert_eq!(messages.len(), 5);
429
430        // Get timestamp of the third message (offset 2)
431        let ts_msg2 = messages[2].timestamp.timestamp_millis();
432
433        // Find offset for that timestamp - should return offset 2
434        let found_offset = partition.find_offset_for_timestamp(ts_msg2).await.unwrap();
435        assert_eq!(
436            found_offset,
437            Some(2),
438            "Should find offset 2 for timestamp {}",
439            ts_msg2
440        );
441
442        // Find offset for timestamp before all messages - should return offset 0
443        let very_old_ts = ts_msg2 - 10000; // 10 seconds before
444        let found_offset = partition
445            .find_offset_for_timestamp(very_old_ts)
446            .await
447            .unwrap();
448        assert_eq!(
449            found_offset,
450            Some(0),
451            "Should find offset 0 for very old timestamp"
452        );
453
454        // Find offset for timestamp in the future - should return None
455        let future_ts = chrono::Utc::now().timestamp_millis() + 60000; // 1 minute in future
456        let found_offset = partition
457            .find_offset_for_timestamp(future_ts)
458            .await
459            .unwrap();
460        assert_eq!(
461            found_offset, None,
462            "Should return None for future timestamp"
463        );
464
465        fs::remove_dir_all(&config.data_dir).unwrap();
466    }
467}