Skip to main content

rivven_core/
topic.rs

1use crate::storage::TieredStorage;
2use crate::{Config, Error, Message, Partition, Result};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::fs;
8use tokio::sync::RwLock;
9use tracing::{info, warn};
10
11/// Topic metadata for persistence
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct TopicMetadata {
14    pub name: String,
15    pub num_partitions: u32,
16    pub created_at: i64,
17}
18
19/// Represents a topic with multiple partitions
20#[derive(Debug)]
21pub struct Topic {
22    /// Topic name
23    name: String,
24
25    /// Partitions in this topic (growable via add_partitions)
26    partitions: parking_lot::RwLock<Vec<Arc<Partition>>>,
27}
28
29impl Topic {
30    /// Create a new topic with the specified number of partitions
31    pub async fn new(config: &Config, name: String, num_partitions: u32) -> Result<Self> {
32        Self::new_with_tiered_storage(config, name, num_partitions, None).await
33    }
34
35    /// Create a new topic with the specified number of partitions and optional tiered storage
36    pub async fn new_with_tiered_storage(
37        config: &Config,
38        name: String,
39        num_partitions: u32,
40        tiered_storage: Option<Arc<TieredStorage>>,
41    ) -> Result<Self> {
42        info!(
43            "Creating topic '{}' with {} partitions (tiered_storage: {})",
44            name,
45            num_partitions,
46            tiered_storage.is_some()
47        );
48
49        let mut partitions = Vec::new();
50        for id in 0..num_partitions {
51            partitions.push(Arc::new(
52                Partition::new_with_tiered_storage(config, &name, id, tiered_storage.clone())
53                    .await?,
54            ));
55        }
56
57        Ok(Self {
58            name,
59            partitions: parking_lot::RwLock::new(partitions),
60        })
61    }
62
63    /// Get the topic name
64    pub fn name(&self) -> &str {
65        &self.name
66    }
67
68    /// Get the number of partitions
69    pub fn num_partitions(&self) -> usize {
70        self.partitions.read().len()
71    }
72
73    /// Get a specific partition
74    pub fn partition(&self, partition_id: u32) -> Result<Arc<Partition>> {
75        self.partitions
76            .read()
77            .get(partition_id as usize)
78            .cloned()
79            .ok_or(Error::PartitionNotFound(partition_id))
80    }
81
82    /// Append a message to a specific partition
83    pub async fn append(&self, partition_id: u32, message: Message) -> Result<u64> {
84        let partition = self.partition(partition_id)?;
85        partition.append(message).await
86    }
87
88    /// Read messages from a specific partition
89    pub async fn read(
90        &self,
91        partition_id: u32,
92        start_offset: u64,
93        max_messages: usize,
94    ) -> Result<Vec<Message>> {
95        let partition = self.partition(partition_id)?;
96        partition.read(start_offset, max_messages).await
97    }
98
99    /// Get all partitions
100    pub fn all_partitions(&self) -> Vec<Arc<Partition>> {
101        self.partitions.read().clone()
102    }
103
104    /// Flush all partitions to disk ensuring durability
105    pub async fn flush(&self) -> Result<()> {
106        let partitions = self.partitions.read().clone();
107        for partition in &partitions {
108            partition.flush().await?;
109        }
110        Ok(())
111    }
112
113    /// Find the first offset with timestamp >= target_timestamp (milliseconds since epoch)
114    /// Returns None if no matching offset is found.
115    pub async fn find_offset_for_timestamp(
116        &self,
117        partition_id: u32,
118        target_timestamp: i64,
119    ) -> Result<Option<u64>> {
120        let partition = self.partition(partition_id)?;
121        partition.find_offset_for_timestamp(target_timestamp).await
122    }
123
124    /// Dynamically add partitions to this topic.
125    ///
126    /// Creates new partitions with IDs from `current_count` to `new_total - 1`.
127    /// Existing partitions and their data are unaffected.
128    pub async fn add_partitions(
129        &self,
130        config: &Config,
131        new_total: u32,
132        tiered_storage: Option<Arc<TieredStorage>>,
133    ) -> Result<u32> {
134        let current_count = self.num_partitions() as u32;
135        if new_total <= current_count {
136            return Err(Error::Other(format!(
137                "New partition count {} must exceed current count {}",
138                new_total, current_count
139            )));
140        }
141
142        let mut new_partitions = Vec::new();
143        for id in current_count..new_total {
144            new_partitions.push(Arc::new(
145                Partition::new_with_tiered_storage(config, &self.name, id, tiered_storage.clone())
146                    .await?,
147            ));
148        }
149
150        let added = new_partitions.len() as u32;
151        self.partitions.write().extend(new_partitions);
152
153        info!(
154            "Added {} partitions to topic '{}' (total: {})",
155            added, self.name, new_total
156        );
157
158        Ok(added)
159    }
160}
161
162/// Manages all topics in the system
163#[derive(Debug, Clone)]
164pub struct TopicManager {
165    topics: Arc<RwLock<HashMap<String, Arc<Topic>>>>,
166    config: Config,
167    tiered_storage: Option<Arc<TieredStorage>>,
168}
169
170/// Metadata file name for topic persistence
171const TOPIC_METADATA_FILE: &str = "topic_metadata.json";
172
173impl TopicManager {
174    /// Create a new topic manager and recover any existing topics from disk
175    pub fn new(config: Config) -> Self {
176        info!(
177            "Creating TopicManager with {} default partitions (tiered_storage: disabled)",
178            config.default_partitions
179        );
180
181        Self {
182            topics: Arc::new(RwLock::new(HashMap::new())),
183            config,
184            tiered_storage: None,
185        }
186    }
187
188    /// Create a new topic manager with tiered storage support
189    pub fn new_with_tiered_storage(config: Config, tiered_storage: Arc<TieredStorage>) -> Self {
190        info!(
191            "Creating TopicManager with {} default partitions (tiered_storage: enabled)",
192            config.default_partitions
193        );
194
195        Self {
196            topics: Arc::new(RwLock::new(HashMap::new())),
197            config,
198            tiered_storage: Some(tiered_storage),
199        }
200    }
201
202    /// Check if tiered storage is enabled
203    pub fn has_tiered_storage(&self) -> bool {
204        self.tiered_storage.is_some()
205    }
206
207    /// Get tiered storage statistics
208    pub fn tiered_storage_stats(&self) -> Option<crate::storage::TieredStorageStatsSnapshot> {
209        self.tiered_storage.as_ref().map(|ts| ts.stats())
210    }
211
212    /// Initialize and recover topics from disk
213    /// This should be called after construction to restore persisted topics
214    pub async fn recover(&self) -> Result<usize> {
215        if !self.config.enable_persistence {
216            info!("Persistence disabled, skipping topic recovery");
217            return Ok(0);
218        }
219
220        let data_dir = PathBuf::from(&self.config.data_dir);
221        let metadata_path = data_dir.join(TOPIC_METADATA_FILE);
222
223        // Try to load metadata file
224        if metadata_path.exists() {
225            match fs::read_to_string(&metadata_path).await {
226                Ok(content) => match serde_json::from_str::<Vec<TopicMetadata>>(&content) {
227                    Ok(topics_metadata) => {
228                        let count = topics_metadata.len();
229                        info!("Recovering {} topics from metadata file", count);
230
231                        for meta in topics_metadata {
232                            if let Err(e) = self.recover_topic(&meta).await {
233                                warn!("Failed to recover topic '{}': {}", meta.name, e);
234                            }
235                        }
236
237                        return Ok(count);
238                    }
239                    Err(e) => {
240                        warn!("Failed to parse topic metadata: {}", e);
241                    }
242                },
243                Err(e) => {
244                    warn!("Failed to read topic metadata file: {}", e);
245                }
246            }
247        }
248
249        // Fallback: scan data directory for topic directories
250        self.recover_from_directory_scan().await
251    }
252
253    /// Recover a single topic from metadata
254    async fn recover_topic(&self, meta: &TopicMetadata) -> Result<()> {
255        let mut topics = self.topics.write().await;
256
257        if topics.contains_key(&meta.name) {
258            return Ok(()); // Already recovered
259        }
260
261        info!(
262            "Recovering topic '{}' with {} partitions",
263            meta.name, meta.num_partitions
264        );
265
266        let topic = Arc::new(
267            Topic::new_with_tiered_storage(
268                &self.config,
269                meta.name.clone(),
270                meta.num_partitions,
271                self.tiered_storage.clone(),
272            )
273            .await?,
274        );
275        topics.insert(meta.name.clone(), topic);
276
277        Ok(())
278    }
279
280    /// Scan data directory for existing topic directories (fallback recovery)
281    async fn recover_from_directory_scan(&self) -> Result<usize> {
282        let data_dir = PathBuf::from(&self.config.data_dir);
283
284        if !data_dir.exists() {
285            return Ok(0);
286        }
287
288        let mut recovered = 0;
289        let mut entries = match fs::read_dir(&data_dir).await {
290            Ok(entries) => entries,
291            Err(e) => {
292                warn!("Failed to read data directory: {}", e);
293                return Ok(0);
294            }
295        };
296
297        while let Ok(Some(entry)) = entries.next_entry().await {
298            let path = entry.path();
299            if !path.is_dir() {
300                continue;
301            }
302
303            let dir_name = match path.file_name().and_then(|n| n.to_str()) {
304                Some(name) => name.to_string(),
305                None => continue,
306            };
307
308            // Skip internal directories
309            if dir_name.starts_with('_') || dir_name.starts_with('.') {
310                continue;
311            }
312
313            // Check if this looks like a topic directory by looking for partition subdirs
314            let mut partition_count = 0u32;
315            if let Ok(mut topic_entries) = fs::read_dir(&path).await {
316                while let Ok(Some(partition_entry)) = topic_entries.next_entry().await {
317                    let partition_path = partition_entry.path();
318                    if partition_path.is_dir() {
319                        if let Some(name) = partition_path.file_name().and_then(|n| n.to_str()) {
320                            if name.starts_with("partition-") {
321                                partition_count += 1;
322                            }
323                        }
324                    }
325                }
326            }
327
328            if partition_count > 0 {
329                info!(
330                    "Discovered topic '{}' with {} partitions from directory scan",
331                    dir_name, partition_count
332                );
333
334                let meta = TopicMetadata {
335                    name: dir_name,
336                    num_partitions: partition_count,
337                    created_at: 0, // Unknown
338                };
339
340                if let Err(e) = self.recover_topic(&meta).await {
341                    warn!("Failed to recover topic '{}': {}", meta.name, e);
342                } else {
343                    recovered += 1;
344                }
345            }
346        }
347
348        // Save discovered topics to metadata file for faster recovery next time
349        if recovered > 0 {
350            let _ = self.persist_metadata().await;
351        }
352
353        Ok(recovered)
354    }
355
356    /// Persist topic metadata to disk
357    async fn persist_metadata(&self) -> Result<()> {
358        if !self.config.enable_persistence {
359            return Ok(());
360        }
361
362        let data_dir = PathBuf::from(&self.config.data_dir);
363        fs::create_dir_all(&data_dir)
364            .await
365            .map_err(|e| Error::Other(format!("Failed to create data directory: {}", e)))?;
366
367        let topics = self.topics.read().await;
368        let metadata: Vec<TopicMetadata> = topics
369            .iter()
370            .map(|(name, topic)| TopicMetadata {
371                name: name.clone(),
372                num_partitions: topic.num_partitions() as u32,
373                created_at: chrono::Utc::now().timestamp_millis(),
374            })
375            .collect();
376
377        let metadata_path = data_dir.join(TOPIC_METADATA_FILE);
378        let content = serde_json::to_string_pretty(&metadata)
379            .map_err(|e| Error::Other(format!("Failed to serialize topic metadata: {}", e)))?;
380
381        fs::write(&metadata_path, content)
382            .await
383            .map_err(|e| Error::Other(format!("Failed to write topic metadata: {}", e)))?;
384
385        info!("Persisted metadata for {} topics", topics.len());
386        Ok(())
387    }
388
389    /// Create a new topic
390    pub async fn create_topic(
391        &self,
392        name: String,
393        num_partitions: Option<u32>,
394    ) -> Result<Arc<Topic>> {
395        let mut topics = self.topics.write().await;
396
397        if topics.contains_key(&name) {
398            return Err(Error::Other(format!("Topic '{}' already exists", name)));
399        }
400
401        let num_partitions = num_partitions.unwrap_or(self.config.default_partitions);
402        let topic = Arc::new(
403            Topic::new_with_tiered_storage(
404                &self.config,
405                name.clone(),
406                num_partitions,
407                self.tiered_storage.clone(),
408            )
409            .await?,
410        );
411
412        topics.insert(name.clone(), topic.clone());
413        drop(topics); // Release lock before persistence
414
415        // Persist metadata asynchronously
416        let _ = self.persist_metadata().await;
417
418        Ok(topic)
419    }
420
421    /// Get a topic by name
422    pub async fn get_topic(&self, name: &str) -> Result<Arc<Topic>> {
423        let topics = self.topics.read().await;
424        topics
425            .get(name)
426            .cloned()
427            .ok_or_else(|| Error::TopicNotFound(name.to_string()))
428    }
429
430    /// Get or create a topic (race-safe: uses write lock directly)
431    pub async fn get_or_create_topic(&self, name: String) -> Result<Arc<Topic>> {
432        // Use write lock to atomically check-and-create, avoiding TOCTOU race
433        let mut topics = self.topics.write().await;
434        if let Some(topic) = topics.get(&name) {
435            return Ok(topic.clone());
436        }
437
438        let num_partitions = self.config.default_partitions;
439        let topic = Arc::new(
440            Topic::new_with_tiered_storage(
441                &self.config,
442                name.clone(),
443                num_partitions,
444                self.tiered_storage.clone(),
445            )
446            .await?,
447        );
448
449        topics.insert(name.clone(), topic.clone());
450        drop(topics); // Release lock before persistence
451
452        // Persist metadata asynchronously
453        let _ = self.persist_metadata().await;
454
455        Ok(topic)
456    }
457
458    /// List all topics
459    pub async fn list_topics(&self) -> Vec<String> {
460        let topics = self.topics.read().await;
461        topics.keys().cloned().collect()
462    }
463
464    /// Delete a topic
465    pub async fn delete_topic(&self, name: &str) -> Result<()> {
466        let mut topics = self.topics.write().await;
467        topics
468            .remove(name)
469            .ok_or_else(|| Error::TopicNotFound(name.to_string()))?;
470        drop(topics); // Release lock before persistence
471
472        info!("Deleted topic '{}'", name);
473
474        // Update persisted metadata
475        let _ = self.persist_metadata().await;
476
477        Ok(())
478    }
479
480    /// Flush all topics to disk ensuring durability during shutdown
481    pub async fn flush_all(&self) -> Result<()> {
482        let topics = self.topics.read().await;
483        for (name, topic) in topics.iter() {
484            info!("Flushing topic '{}'...", name);
485            topic.flush().await?;
486        }
487        Ok(())
488    }
489
490    /// Add partitions to an existing topic.
491    ///
492    /// Increases the partition count of the topic to `new_partition_count`.
493    /// Returns the number of partitions actually added.
494    pub async fn add_partitions(&self, name: &str, new_partition_count: u32) -> Result<u32> {
495        let topics = self.topics.read().await;
496        let topic = topics
497            .get(name)
498            .ok_or_else(|| Error::TopicNotFound(name.to_string()))?
499            .clone();
500        drop(topics);
501
502        let added = topic
503            .add_partitions(
504                &self.config,
505                new_partition_count,
506                self.tiered_storage.clone(),
507            )
508            .await?;
509
510        // Update persisted metadata
511        let _ = self.persist_metadata().await;
512
513        Ok(added)
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use bytes::Bytes;
521
522    fn get_test_config() -> Config {
523        Config {
524            data_dir: format!("/tmp/rivven-test-{}", uuid::Uuid::new_v4()),
525            ..Default::default()
526        }
527    }
528
529    #[tokio::test]
530    async fn test_topic_creation() {
531        let config = get_test_config();
532        let topic = Topic::new(&config, "test-topic".to_string(), 3)
533            .await
534            .unwrap();
535        assert_eq!(topic.name(), "test-topic");
536        assert_eq!(topic.num_partitions(), 3);
537    }
538
539    #[tokio::test]
540    async fn test_topic_append_and_read() {
541        let config = get_test_config();
542        let topic = Topic::new(&config, "test-topic".to_string(), 2)
543            .await
544            .unwrap();
545
546        let msg = Message::new(Bytes::from("test"));
547        let offset = topic.append(0, msg).await.unwrap();
548        assert_eq!(offset, 0);
549
550        let messages = topic.read(0, 0, 10).await.unwrap();
551        assert_eq!(messages.len(), 1);
552    }
553
554    #[tokio::test]
555    async fn test_topic_manager() {
556        let config = get_test_config();
557        let manager = TopicManager::new(config);
558
559        let topic = manager
560            .create_topic("test".to_string(), None)
561            .await
562            .unwrap();
563        assert_eq!(topic.num_partitions(), 3);
564
565        let retrieved = manager.get_topic("test").await.unwrap();
566        assert_eq!(retrieved.name(), "test");
567
568        let topics = manager.list_topics().await;
569        assert_eq!(topics.len(), 1);
570    }
571}