1use crate::metrics::{CoreMetrics, Timer};
2use crate::storage::{LogManager, TieredStorage};
3use crate::{Config, Message, Result};
4use bytes::Bytes;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, info, warn};
9
10#[derive(Debug)]
12pub struct Partition {
13 topic: String,
15
16 id: u32,
18
19 log_manager: Arc<RwLock<LogManager>>,
21
22 tiered_storage: Option<Arc<TieredStorage>>,
24
25 next_offset: AtomicU64,
28
29 low_watermark: AtomicU64,
32}
33
34impl Partition {
35 pub async fn new(config: &Config, topic: &str, id: u32) -> Result<Self> {
37 Self::new_with_tiered_storage(config, topic, id, None).await
38 }
39
40 pub async fn new_with_tiered_storage(
42 config: &Config,
43 topic: &str,
44 id: u32,
45 tiered_storage: Option<Arc<TieredStorage>>,
46 ) -> Result<Self> {
47 info!(
48 "Creating partition {} for topic {} (tiered_storage: {})",
49 id,
50 topic,
51 tiered_storage.is_some()
52 );
53 let base_dir = std::path::PathBuf::from(&config.data_dir);
54 let log_manager = LogManager::new(base_dir, topic, id, config.max_segment_size).await?;
55
56 let recovered_offset = log_manager.recover_next_offset().await?;
58 let next_offset = AtomicU64::new(recovered_offset);
59
60 Ok(Self {
61 topic: topic.to_string(),
62 id,
63 log_manager: Arc::new(RwLock::new(log_manager)),
64 tiered_storage,
65 next_offset,
66 low_watermark: AtomicU64::new(0),
67 })
68 }
69
70 pub fn id(&self) -> u32 {
72 self.id
73 }
74
75 pub fn topic(&self) -> &str {
77 &self.topic
78 }
79
80 pub fn has_tiered_storage(&self) -> bool {
82 self.tiered_storage.is_some()
83 }
84
85 pub async fn append(&self, mut message: Message) -> Result<u64> {
88 let timer = Timer::new();
89
90 let mut log = self.log_manager.write().await;
92
93 let offset = self.next_offset.fetch_add(1, Ordering::AcqRel);
97
98 message.offset = offset;
99
100 let tiered_bytes = if self.tiered_storage.is_some() {
103 match message.to_bytes() {
104 Ok(data) => Some(Bytes::from(data)),
105 Err(e) => {
106 warn!("Failed to serialize for tiered storage: {} (continuing)", e);
107 None
108 }
109 }
110 } else {
111 None
112 };
113
114 if let Err(e) = log.append(offset, message).await {
116 let _ = self.next_offset.compare_exchange(
118 offset + 1,
119 offset,
120 Ordering::AcqRel,
121 Ordering::Relaxed,
122 );
123 return Err(e);
124 }
125 drop(log);
126
127 if let (Some(tiered), Some(data)) = (&self.tiered_storage, tiered_bytes) {
129 if let Err(e) = tiered
130 .write(&self.topic, self.id, offset, offset + 1, data)
131 .await
132 {
133 warn!(
135 "Failed to write to tiered storage: {} (data safe in log)",
136 e
137 );
138 }
139 }
140
141 CoreMetrics::increment_messages_appended();
143 CoreMetrics::record_append_latency_us(timer.elapsed_us());
144
145 debug!(
146 "Appended message at offset {} to partition {}",
147 offset, self.id
148 );
149
150 Ok(offset)
151 }
152
153 pub async fn read(&self, start_offset: u64, max_messages: usize) -> Result<Vec<Message>> {
155 let timer = Timer::new();
156
157 let wm = self.low_watermark.load(Ordering::Acquire);
159 let effective_offset = start_offset.max(wm);
160
161 let log = self.log_manager.read().await;
162 let messages = log.read(effective_offset, max_messages * 4096).await?;
164
165 let result: Vec<Message> = messages.into_iter().take(max_messages).collect();
166
167 CoreMetrics::add_messages_read(result.len() as u64);
169 CoreMetrics::record_read_latency_us(timer.elapsed_us());
170
171 debug!(
172 "Read {} messages from partition {} starting at offset {}",
173 result.len(),
174 self.id,
175 start_offset
176 );
177
178 Ok(result)
179 }
180
181 pub async fn latest_offset(&self) -> u64 {
183 self.next_offset.load(Ordering::Acquire)
184 }
185
186 pub async fn earliest_offset(&self) -> Option<u64> {
187 let log_earliest = {
188 let log = self.log_manager.read().await;
189 log.earliest_offset()
190 };
191 let wm = self.low_watermark.load(Ordering::Acquire);
192 Some(log_earliest.max(wm))
193 }
194
195 pub async fn set_low_watermark(&self, offset: u64) {
204 self.low_watermark.fetch_max(offset, Ordering::Release);
205
206 let mut log = self.log_manager.write().await;
210 match log.truncate_before(offset) {
211 Ok(0) => {}
212 Ok(n) => {
213 info!(
214 "Partition {}/{}: truncated {} segment(s) below watermark {}",
215 self.topic, self.id, n, offset
216 );
217 }
218 Err(e) => {
219 warn!(
220 "Partition {}/{}: segment truncation failed: {}",
221 self.topic, self.id, e
222 );
223 }
224 }
225 }
226
227 pub fn low_watermark(&self) -> u64 {
229 self.low_watermark.load(Ordering::Acquire)
230 }
231
232 pub async fn message_count(&self) -> usize {
233 let earliest = self.earliest_offset().await.unwrap_or(0);
234 let next = self.next_offset.load(Ordering::SeqCst);
235 (next.saturating_sub(earliest)) as usize
236 }
237
238 pub async fn append_batch(&self, messages: Vec<Message>) -> Result<Vec<u64>> {
241 if messages.is_empty() {
242 return Ok(Vec::new());
243 }
244
245 let timer = Timer::new();
246 let batch_size = messages.len();
247
248 let start_offset = self
250 .next_offset
251 .fetch_add(batch_size as u64, Ordering::SeqCst);
252
253 let mut offsets = Vec::with_capacity(batch_size);
254 let mut batch_messages = Vec::with_capacity(batch_size);
255 let mut batch_data = Vec::new();
256
257 for (i, mut message) in messages.into_iter().enumerate() {
259 let offset = start_offset + i as u64;
260 message.offset = offset;
261
262 if self.tiered_storage.is_some() {
264 if let Ok(data) = message.to_bytes() {
265 batch_data.extend_from_slice(&data);
266 }
267 }
268
269 batch_messages.push((offset, message));
270 offsets.push(offset);
271 }
272
273 {
275 let mut log = self.log_manager.write().await;
276 log.append_batch(batch_messages).await?;
277 }
278
279 if let Some(tiered) = &self.tiered_storage {
281 if !batch_data.is_empty() {
282 let end_offset = start_offset + batch_size as u64;
283 if let Err(e) = tiered
284 .write(
285 &self.topic,
286 self.id,
287 start_offset,
288 end_offset,
289 Bytes::from(batch_data),
290 )
291 .await
292 {
293 warn!(
295 "Failed to write batch to tiered storage: {} (data safe in log)",
296 e
297 );
298 }
299 }
300 }
301
302 CoreMetrics::increment_batch_appends();
304 CoreMetrics::add_messages_appended(batch_size as u64);
305 CoreMetrics::record_batch_append_latency_us(timer.elapsed_us());
306
307 debug!(
308 "Batch appended {} messages to partition {} (offsets {}-{})",
309 batch_size,
310 self.id,
311 start_offset,
312 start_offset + batch_size as u64 - 1
313 );
314
315 Ok(offsets)
316 }
317
318 pub async fn flush(&self) -> Result<()> {
320 let log = self.log_manager.read().await;
321 log.flush().await?;
322
323 if let Some(tiered) = &self.tiered_storage {
325 tiered.flush_hot_tier(&self.topic, self.id).await?;
326 }
327
328 Ok(())
329 }
330
331 pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
334 let log = self.log_manager.read().await;
335 log.find_offset_for_timestamp(target_timestamp).await
336 }
337
338 pub fn tiered_storage_stats(&self) -> Option<crate::storage::TieredStorageStatsSnapshot> {
340 self.tiered_storage.as_ref().map(|ts| ts.stats())
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::Config;
348 use bytes::Bytes;
349 use std::fs;
350
351 fn get_test_config() -> Config {
352 let config = Config {
353 data_dir: format!("/tmp/rivven-test-partition-{}", uuid::Uuid::new_v4()),
354 ..Default::default()
355 };
356 let _ = fs::remove_dir_all(&config.data_dir);
357 config
358 }
359
360 #[tokio::test]
361 async fn test_partition_persistence() {
362 let config = get_test_config();
363 let topic = "test-topic";
364 let part_id = 0;
365
366 {
368 let partition = Partition::new(&config, topic, part_id).await.unwrap();
369
370 partition
371 .append(Message::new(Bytes::from("msg1")))
372 .await
373 .unwrap();
374 partition
375 .append(Message::new(Bytes::from("msg2")))
376 .await
377 .unwrap();
378
379 let stored = partition.read(0, 10).await.unwrap();
380 assert_eq!(stored.len(), 2);
381 assert_eq!(stored[0].value, Bytes::from("msg1"));
382 assert_eq!(stored[1].value, Bytes::from("msg2"));
383 }
384
385 {
387 let partition = Partition::new(&config, topic, part_id).await.unwrap();
388
389 assert_eq!(partition.latest_offset().await, 2);
391
392 let stored = partition.read(0, 10).await.unwrap();
394 assert_eq!(stored.len(), 2);
395 assert_eq!(stored[0].value, Bytes::from("msg1"));
396
397 partition
399 .append(Message::new(Bytes::from("msg3")))
400 .await
401 .unwrap();
402 let stored = partition.read(0, 10).await.unwrap();
403 assert_eq!(stored.len(), 3);
404 assert_eq!(stored[2].value, Bytes::from("msg3"));
405 }
406
407 fs::remove_dir_all(&config.data_dir).unwrap();
408 }
409
410 #[tokio::test]
411 async fn test_find_offset_for_timestamp() {
412 let config = get_test_config();
413 let topic = "test-topic-ts";
414 let part_id = 0;
415
416 let partition = Partition::new(&config, topic, part_id).await.unwrap();
417
418 for i in 0..5 {
420 let msg = Message::new(Bytes::from(format!("msg{}", i)));
421 partition.append(msg).await.unwrap();
422 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
424 }
425
426 let messages = partition.read(0, 10).await.unwrap();
428 assert_eq!(messages.len(), 5);
429
430 let ts_msg2 = messages[2].timestamp.timestamp_millis();
432
433 let found_offset = partition.find_offset_for_timestamp(ts_msg2).await.unwrap();
435 assert_eq!(
436 found_offset,
437 Some(2),
438 "Should find offset 2 for timestamp {}",
439 ts_msg2
440 );
441
442 let very_old_ts = ts_msg2 - 10000; let found_offset = partition
445 .find_offset_for_timestamp(very_old_ts)
446 .await
447 .unwrap();
448 assert_eq!(
449 found_offset,
450 Some(0),
451 "Should find offset 0 for very old timestamp"
452 );
453
454 let future_ts = chrono::Utc::now().timestamp_millis() + 60000; let found_offset = partition
457 .find_offset_for_timestamp(future_ts)
458 .await
459 .unwrap();
460 assert_eq!(
461 found_offset, None,
462 "Should return None for future timestamp"
463 );
464
465 fs::remove_dir_all(&config.data_dir).unwrap();
466 }
467}