1use crate::metrics::{CoreMetrics, Timer};
2use crate::storage::LogManager;
3use crate::{Config, Message, Result};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use tracing::{debug, info};
8
9#[derive(Debug)]
11pub struct Partition {
12 id: u32,
14
15 log_manager: Arc<RwLock<LogManager>>,
17
18 next_offset: AtomicU64,
21}
22
23impl Partition {
24 pub async fn new(config: &Config, topic: &str, id: u32) -> Result<Self> {
26 info!("Creating partition {} for topic {}", id, topic);
27 let base_dir = std::path::PathBuf::from(&config.data_dir);
28 let log_manager = LogManager::new(base_dir, topic, id, config.max_segment_size).await?;
29
30 let recovered_offset = log_manager.recover_next_offset().await?;
32 let next_offset = AtomicU64::new(recovered_offset);
33
34 Ok(Self {
35 id,
36 log_manager: Arc::new(RwLock::new(log_manager)),
37 next_offset,
38 })
39 }
40
41 pub fn id(&self) -> u32 {
43 self.id
44 }
45
46 pub async fn append(&self, mut message: Message) -> Result<u64> {
49 let timer = Timer::new();
50
51 let offset = self.next_offset.fetch_add(1, Ordering::SeqCst);
53
54 message.offset = offset;
55
56 let mut log = self.log_manager.write().await;
57 log.append(offset, message).await?;
58
59 CoreMetrics::increment_messages_appended();
61 CoreMetrics::record_append_latency_us(timer.elapsed_us());
62
63 debug!(
64 "Appended message at offset {} to partition {}",
65 offset, self.id
66 );
67
68 Ok(offset)
69 }
70
71 pub async fn read(&self, start_offset: u64, max_messages: usize) -> Result<Vec<Message>> {
73 let timer = Timer::new();
74
75 let log = self.log_manager.read().await;
76 let messages = log.read(start_offset, max_messages * 4096).await?;
78
79 let result: Vec<Message> = messages.into_iter().take(max_messages).collect();
80
81 CoreMetrics::add_messages_read(result.len() as u64);
83 CoreMetrics::record_read_latency_us(timer.elapsed_us());
84
85 debug!(
86 "Read {} messages from partition {} starting at offset {}",
87 result.len(),
88 self.id,
89 start_offset
90 );
91
92 Ok(result)
93 }
94
95 pub async fn latest_offset(&self) -> u64 {
97 self.next_offset.load(Ordering::SeqCst)
98 }
99
100 pub async fn earliest_offset(&self) -> Option<u64> {
101 let log = self.log_manager.read().await;
102 Some(log.earliest_offset())
103 }
104
105 pub async fn message_count(&self) -> usize {
106 let earliest = self.earliest_offset().await.unwrap_or(0);
107 let next = self.next_offset.load(Ordering::SeqCst);
108 (next.saturating_sub(earliest)) as usize
109 }
110
111 pub async fn append_batch(&self, messages: Vec<Message>) -> Result<Vec<u64>> {
114 if messages.is_empty() {
115 return Ok(Vec::new());
116 }
117
118 let timer = Timer::new();
119 let batch_size = messages.len();
120
121 let start_offset = self
123 .next_offset
124 .fetch_add(batch_size as u64, Ordering::SeqCst);
125
126 let mut offsets = Vec::with_capacity(batch_size);
127 let mut log = self.log_manager.write().await;
128
129 for (i, mut message) in messages.into_iter().enumerate() {
130 let offset = start_offset + i as u64;
131 message.offset = offset;
132 log.append(offset, message).await?;
133 offsets.push(offset);
134 }
135
136 CoreMetrics::increment_batch_appends();
138 CoreMetrics::add_messages_appended(batch_size as u64);
139 CoreMetrics::record_batch_append_latency_us(timer.elapsed_us());
140
141 debug!(
142 "Batch appended {} messages to partition {} (offsets {}-{})",
143 batch_size,
144 self.id,
145 start_offset,
146 start_offset + batch_size as u64 - 1
147 );
148
149 Ok(offsets)
150 }
151
152 pub async fn flush(&self) -> Result<()> {
154 let log = self.log_manager.read().await;
155 log.flush().await
156 }
157
158 pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
161 let log = self.log_manager.read().await;
162 log.find_offset_for_timestamp(target_timestamp).await
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::Config;
170 use bytes::Bytes;
171 use std::fs;
172
173 fn get_test_config() -> Config {
174 let config = Config {
175 data_dir: format!("/tmp/rivven-test-partition-{}", uuid::Uuid::new_v4()),
176 ..Default::default()
177 };
178 let _ = fs::remove_dir_all(&config.data_dir);
179 config
180 }
181
182 #[tokio::test]
183 async fn test_partition_persistence() {
184 let config = get_test_config();
185 let topic = "test-topic";
186 let part_id = 0;
187
188 {
190 let partition = Partition::new(&config, topic, part_id).await.unwrap();
191
192 partition
193 .append(Message::new(Bytes::from("msg1")))
194 .await
195 .unwrap();
196 partition
197 .append(Message::new(Bytes::from("msg2")))
198 .await
199 .unwrap();
200
201 let stored = partition.read(0, 10).await.unwrap();
202 assert_eq!(stored.len(), 2);
203 assert_eq!(stored[0].value, Bytes::from("msg1"));
204 assert_eq!(stored[1].value, Bytes::from("msg2"));
205 }
206
207 {
209 let partition = Partition::new(&config, topic, part_id).await.unwrap();
210
211 assert_eq!(partition.latest_offset().await, 2);
213
214 let stored = partition.read(0, 10).await.unwrap();
216 assert_eq!(stored.len(), 2);
217 assert_eq!(stored[0].value, Bytes::from("msg1"));
218
219 partition
221 .append(Message::new(Bytes::from("msg3")))
222 .await
223 .unwrap();
224 let stored = partition.read(0, 10).await.unwrap();
225 assert_eq!(stored.len(), 3);
226 assert_eq!(stored[2].value, Bytes::from("msg3"));
227 }
228
229 fs::remove_dir_all(&config.data_dir).unwrap();
230 }
231
232 #[tokio::test]
233 async fn test_find_offset_for_timestamp() {
234 let config = get_test_config();
235 let topic = "test-topic-ts";
236 let part_id = 0;
237
238 let partition = Partition::new(&config, topic, part_id).await.unwrap();
239
240 for i in 0..5 {
242 let msg = Message::new(Bytes::from(format!("msg{}", i)));
243 partition.append(msg).await.unwrap();
244 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
246 }
247
248 let messages = partition.read(0, 10).await.unwrap();
250 assert_eq!(messages.len(), 5);
251
252 let ts_msg2 = messages[2].timestamp.timestamp_millis();
254
255 let found_offset = partition.find_offset_for_timestamp(ts_msg2).await.unwrap();
257 assert_eq!(
258 found_offset,
259 Some(2),
260 "Should find offset 2 for timestamp {}",
261 ts_msg2
262 );
263
264 let very_old_ts = ts_msg2 - 10000; let found_offset = partition
267 .find_offset_for_timestamp(very_old_ts)
268 .await
269 .unwrap();
270 assert_eq!(
271 found_offset,
272 Some(0),
273 "Should find offset 0 for very old timestamp"
274 );
275
276 let future_ts = chrono::Utc::now().timestamp_millis() + 60000; let found_offset = partition
279 .find_offset_for_timestamp(future_ts)
280 .await
281 .unwrap();
282 assert_eq!(
283 found_offset, None,
284 "Should return None for future timestamp"
285 );
286
287 fs::remove_dir_all(&config.data_dir).unwrap();
288 }
289}