1use crate::storage::TieredStorage;
2use crate::{Config, Error, Message, Partition, Result};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::fs;
8use tokio::sync::RwLock;
9use tracing::{info, warn};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct TopicMetadata {
14 pub name: String,
15 pub num_partitions: u32,
16 pub created_at: i64,
17}
18
19#[derive(Debug)]
21pub struct Topic {
22 name: String,
24
25 partitions: parking_lot::RwLock<Vec<Arc<Partition>>>,
27}
28
29impl Topic {
30 pub async fn new(config: &Config, name: String, num_partitions: u32) -> Result<Self> {
32 Self::new_with_tiered_storage(config, name, num_partitions, None).await
33 }
34
35 pub async fn new_with_tiered_storage(
37 config: &Config,
38 name: String,
39 num_partitions: u32,
40 tiered_storage: Option<Arc<TieredStorage>>,
41 ) -> Result<Self> {
42 info!(
43 "Creating topic '{}' with {} partitions (tiered_storage: {})",
44 name,
45 num_partitions,
46 tiered_storage.is_some()
47 );
48
49 let mut partitions = Vec::new();
50 for id in 0..num_partitions {
51 partitions.push(Arc::new(
52 Partition::new_with_tiered_storage(config, &name, id, tiered_storage.clone())
53 .await?,
54 ));
55 }
56
57 Ok(Self {
58 name,
59 partitions: parking_lot::RwLock::new(partitions),
60 })
61 }
62
63 pub fn name(&self) -> &str {
65 &self.name
66 }
67
68 pub fn num_partitions(&self) -> usize {
70 self.partitions.read().len()
71 }
72
73 pub fn partition(&self, partition_id: u32) -> Result<Arc<Partition>> {
75 self.partitions
76 .read()
77 .get(partition_id as usize)
78 .cloned()
79 .ok_or(Error::PartitionNotFound(partition_id))
80 }
81
82 pub async fn append(&self, partition_id: u32, message: Message) -> Result<u64> {
84 let partition = self.partition(partition_id)?;
85 partition.append(message).await
86 }
87
88 pub async fn read(
90 &self,
91 partition_id: u32,
92 start_offset: u64,
93 max_messages: usize,
94 ) -> Result<Vec<Message>> {
95 let partition = self.partition(partition_id)?;
96 partition.read(start_offset, max_messages).await
97 }
98
99 pub fn all_partitions(&self) -> Vec<Arc<Partition>> {
101 self.partitions.read().clone()
102 }
103
104 pub async fn flush(&self) -> Result<()> {
106 let partitions = self.partitions.read().clone();
107 for partition in &partitions {
108 partition.flush().await?;
109 }
110 Ok(())
111 }
112
113 pub async fn find_offset_for_timestamp(
116 &self,
117 partition_id: u32,
118 target_timestamp: i64,
119 ) -> Result<Option<u64>> {
120 let partition = self.partition(partition_id)?;
121 partition.find_offset_for_timestamp(target_timestamp).await
122 }
123
124 pub async fn add_partitions(
129 &self,
130 config: &Config,
131 new_total: u32,
132 tiered_storage: Option<Arc<TieredStorage>>,
133 ) -> Result<u32> {
134 let current_count = self.num_partitions() as u32;
135 if new_total <= current_count {
136 return Err(Error::Other(format!(
137 "New partition count {} must exceed current count {}",
138 new_total, current_count
139 )));
140 }
141
142 let mut new_partitions = Vec::new();
143 for id in current_count..new_total {
144 new_partitions.push(Arc::new(
145 Partition::new_with_tiered_storage(config, &self.name, id, tiered_storage.clone())
146 .await?,
147 ));
148 }
149
150 let added = new_partitions.len() as u32;
151 self.partitions.write().extend(new_partitions);
152
153 info!(
154 "Added {} partitions to topic '{}' (total: {})",
155 added, self.name, new_total
156 );
157
158 Ok(added)
159 }
160}
161
162#[derive(Debug, Clone)]
164pub struct TopicManager {
165 topics: Arc<RwLock<HashMap<String, Arc<Topic>>>>,
166 config: Config,
167 tiered_storage: Option<Arc<TieredStorage>>,
168}
169
170const TOPIC_METADATA_FILE: &str = "topic_metadata.json";
172
173impl TopicManager {
174 pub fn new(config: Config) -> Self {
176 info!(
177 "Creating TopicManager with {} default partitions (tiered_storage: disabled)",
178 config.default_partitions
179 );
180
181 Self {
182 topics: Arc::new(RwLock::new(HashMap::new())),
183 config,
184 tiered_storage: None,
185 }
186 }
187
188 pub fn new_with_tiered_storage(config: Config, tiered_storage: Arc<TieredStorage>) -> Self {
190 info!(
191 "Creating TopicManager with {} default partitions (tiered_storage: enabled)",
192 config.default_partitions
193 );
194
195 Self {
196 topics: Arc::new(RwLock::new(HashMap::new())),
197 config,
198 tiered_storage: Some(tiered_storage),
199 }
200 }
201
202 pub fn has_tiered_storage(&self) -> bool {
204 self.tiered_storage.is_some()
205 }
206
207 pub fn tiered_storage_stats(&self) -> Option<crate::storage::TieredStorageStatsSnapshot> {
209 self.tiered_storage.as_ref().map(|ts| ts.stats())
210 }
211
212 pub async fn recover(&self) -> Result<usize> {
215 if !self.config.enable_persistence {
216 info!("Persistence disabled, skipping topic recovery");
217 return Ok(0);
218 }
219
220 let data_dir = PathBuf::from(&self.config.data_dir);
221 let metadata_path = data_dir.join(TOPIC_METADATA_FILE);
222
223 if metadata_path.exists() {
225 match fs::read_to_string(&metadata_path).await {
226 Ok(content) => match serde_json::from_str::<Vec<TopicMetadata>>(&content) {
227 Ok(topics_metadata) => {
228 let count = topics_metadata.len();
229 info!("Recovering {} topics from metadata file", count);
230
231 for meta in topics_metadata {
232 if let Err(e) = self.recover_topic(&meta).await {
233 warn!("Failed to recover topic '{}': {}", meta.name, e);
234 }
235 }
236
237 return Ok(count);
238 }
239 Err(e) => {
240 warn!("Failed to parse topic metadata: {}", e);
241 }
242 },
243 Err(e) => {
244 warn!("Failed to read topic metadata file: {}", e);
245 }
246 }
247 }
248
249 self.recover_from_directory_scan().await
251 }
252
253 async fn recover_topic(&self, meta: &TopicMetadata) -> Result<()> {
255 let mut topics = self.topics.write().await;
256
257 if topics.contains_key(&meta.name) {
258 return Ok(()); }
260
261 info!(
262 "Recovering topic '{}' with {} partitions",
263 meta.name, meta.num_partitions
264 );
265
266 let topic = Arc::new(
267 Topic::new_with_tiered_storage(
268 &self.config,
269 meta.name.clone(),
270 meta.num_partitions,
271 self.tiered_storage.clone(),
272 )
273 .await?,
274 );
275 topics.insert(meta.name.clone(), topic);
276
277 Ok(())
278 }
279
280 async fn recover_from_directory_scan(&self) -> Result<usize> {
282 let data_dir = PathBuf::from(&self.config.data_dir);
283
284 if !data_dir.exists() {
285 return Ok(0);
286 }
287
288 let mut recovered = 0;
289 let mut entries = match fs::read_dir(&data_dir).await {
290 Ok(entries) => entries,
291 Err(e) => {
292 warn!("Failed to read data directory: {}", e);
293 return Ok(0);
294 }
295 };
296
297 while let Ok(Some(entry)) = entries.next_entry().await {
298 let path = entry.path();
299 if !path.is_dir() {
300 continue;
301 }
302
303 let dir_name = match path.file_name().and_then(|n| n.to_str()) {
304 Some(name) => name.to_string(),
305 None => continue,
306 };
307
308 if dir_name.starts_with('_') || dir_name.starts_with('.') {
310 continue;
311 }
312
313 let mut partition_count = 0u32;
315 if let Ok(mut topic_entries) = fs::read_dir(&path).await {
316 while let Ok(Some(partition_entry)) = topic_entries.next_entry().await {
317 let partition_path = partition_entry.path();
318 if partition_path.is_dir() {
319 if let Some(name) = partition_path.file_name().and_then(|n| n.to_str()) {
320 if name.starts_with("partition-") {
321 partition_count += 1;
322 }
323 }
324 }
325 }
326 }
327
328 if partition_count > 0 {
329 info!(
330 "Discovered topic '{}' with {} partitions from directory scan",
331 dir_name, partition_count
332 );
333
334 let meta = TopicMetadata {
335 name: dir_name,
336 num_partitions: partition_count,
337 created_at: 0, };
339
340 if let Err(e) = self.recover_topic(&meta).await {
341 warn!("Failed to recover topic '{}': {}", meta.name, e);
342 } else {
343 recovered += 1;
344 }
345 }
346 }
347
348 if recovered > 0 {
350 let _ = self.persist_metadata().await;
351 }
352
353 Ok(recovered)
354 }
355
356 async fn persist_metadata(&self) -> Result<()> {
358 if !self.config.enable_persistence {
359 return Ok(());
360 }
361
362 let data_dir = PathBuf::from(&self.config.data_dir);
363 fs::create_dir_all(&data_dir)
364 .await
365 .map_err(|e| Error::Other(format!("Failed to create data directory: {}", e)))?;
366
367 let topics = self.topics.read().await;
368 let metadata: Vec<TopicMetadata> = topics
369 .iter()
370 .map(|(name, topic)| TopicMetadata {
371 name: name.clone(),
372 num_partitions: topic.num_partitions() as u32,
373 created_at: chrono::Utc::now().timestamp_millis(),
374 })
375 .collect();
376
377 let metadata_path = data_dir.join(TOPIC_METADATA_FILE);
378 let content = serde_json::to_string_pretty(&metadata)
379 .map_err(|e| Error::Other(format!("Failed to serialize topic metadata: {}", e)))?;
380
381 fs::write(&metadata_path, content)
382 .await
383 .map_err(|e| Error::Other(format!("Failed to write topic metadata: {}", e)))?;
384
385 info!("Persisted metadata for {} topics", topics.len());
386 Ok(())
387 }
388
389 pub async fn create_topic(
391 &self,
392 name: String,
393 num_partitions: Option<u32>,
394 ) -> Result<Arc<Topic>> {
395 let mut topics = self.topics.write().await;
396
397 if topics.contains_key(&name) {
398 return Err(Error::Other(format!("Topic '{}' already exists", name)));
399 }
400
401 let num_partitions = num_partitions.unwrap_or(self.config.default_partitions);
402 let topic = Arc::new(
403 Topic::new_with_tiered_storage(
404 &self.config,
405 name.clone(),
406 num_partitions,
407 self.tiered_storage.clone(),
408 )
409 .await?,
410 );
411
412 topics.insert(name.clone(), topic.clone());
413 drop(topics); let _ = self.persist_metadata().await;
417
418 Ok(topic)
419 }
420
421 pub async fn get_topic(&self, name: &str) -> Result<Arc<Topic>> {
423 let topics = self.topics.read().await;
424 topics
425 .get(name)
426 .cloned()
427 .ok_or_else(|| Error::TopicNotFound(name.to_string()))
428 }
429
430 pub async fn get_or_create_topic(&self, name: String) -> Result<Arc<Topic>> {
432 let mut topics = self.topics.write().await;
434 if let Some(topic) = topics.get(&name) {
435 return Ok(topic.clone());
436 }
437
438 let num_partitions = self.config.default_partitions;
439 let topic = Arc::new(
440 Topic::new_with_tiered_storage(
441 &self.config,
442 name.clone(),
443 num_partitions,
444 self.tiered_storage.clone(),
445 )
446 .await?,
447 );
448
449 topics.insert(name.clone(), topic.clone());
450 drop(topics); let _ = self.persist_metadata().await;
454
455 Ok(topic)
456 }
457
458 pub async fn list_topics(&self) -> Vec<String> {
460 let topics = self.topics.read().await;
461 topics.keys().cloned().collect()
462 }
463
464 pub async fn delete_topic(&self, name: &str) -> Result<()> {
466 let mut topics = self.topics.write().await;
467 topics
468 .remove(name)
469 .ok_or_else(|| Error::TopicNotFound(name.to_string()))?;
470 drop(topics); info!("Deleted topic '{}'", name);
473
474 let _ = self.persist_metadata().await;
476
477 Ok(())
478 }
479
480 pub async fn flush_all(&self) -> Result<()> {
482 let topics = self.topics.read().await;
483 for (name, topic) in topics.iter() {
484 info!("Flushing topic '{}'...", name);
485 topic.flush().await?;
486 }
487 Ok(())
488 }
489
490 pub async fn add_partitions(&self, name: &str, new_partition_count: u32) -> Result<u32> {
495 let topics = self.topics.read().await;
496 let topic = topics
497 .get(name)
498 .ok_or_else(|| Error::TopicNotFound(name.to_string()))?
499 .clone();
500 drop(topics);
501
502 let added = topic
503 .add_partitions(
504 &self.config,
505 new_partition_count,
506 self.tiered_storage.clone(),
507 )
508 .await?;
509
510 let _ = self.persist_metadata().await;
512
513 Ok(added)
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use bytes::Bytes;
521
522 fn get_test_config() -> Config {
523 Config {
524 data_dir: format!("/tmp/rivven-test-{}", uuid::Uuid::new_v4()),
525 ..Default::default()
526 }
527 }
528
529 #[tokio::test]
530 async fn test_topic_creation() {
531 let config = get_test_config();
532 let topic = Topic::new(&config, "test-topic".to_string(), 3)
533 .await
534 .unwrap();
535 assert_eq!(topic.name(), "test-topic");
536 assert_eq!(topic.num_partitions(), 3);
537 }
538
539 #[tokio::test]
540 async fn test_topic_append_and_read() {
541 let config = get_test_config();
542 let topic = Topic::new(&config, "test-topic".to_string(), 2)
543 .await
544 .unwrap();
545
546 let msg = Message::new(Bytes::from("test"));
547 let offset = topic.append(0, msg).await.unwrap();
548 assert_eq!(offset, 0);
549
550 let messages = topic.read(0, 0, 10).await.unwrap();
551 assert_eq!(messages.len(), 1);
552 }
553
554 #[tokio::test]
555 async fn test_topic_manager() {
556 let config = get_test_config();
557 let manager = TopicManager::new(config);
558
559 let topic = manager
560 .create_topic("test".to_string(), None)
561 .await
562 .unwrap();
563 assert_eq!(topic.num_partitions(), 3);
564
565 let retrieved = manager.get_topic("test").await.unwrap();
566 assert_eq!(retrieved.name(), "test");
567
568 let topics = manager.list_topics().await;
569 assert_eq!(topics.len(), 1);
570 }
571}