Skip to main content

rivven_core/
topic.rs

1use crate::{Config, Error, Message, Partition, Result};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tracing::info;
6
7/// Represents a topic with multiple partitions
8#[derive(Debug)]
9pub struct Topic {
10    /// Topic name
11    name: String,
12
13    /// Partitions in this topic
14    partitions: Vec<Arc<Partition>>,
15}
16
17impl Topic {
18    /// Create a new topic with the specified number of partitions
19    pub async fn new(config: &Config, name: String, num_partitions: u32) -> Result<Self> {
20        info!(
21            "Creating topic '{}' with {} partitions",
22            name, num_partitions
23        );
24
25        let mut partitions = Vec::new();
26        for id in 0..num_partitions {
27            partitions.push(Arc::new(Partition::new(config, &name, id).await?));
28        }
29
30        Ok(Self { name, partitions })
31    }
32
33    /// Get the topic name
34    pub fn name(&self) -> &str {
35        &self.name
36    }
37
38    /// Get the number of partitions
39    pub fn num_partitions(&self) -> usize {
40        self.partitions.len()
41    }
42
43    /// Get a specific partition
44    pub fn partition(&self, partition_id: u32) -> Result<Arc<Partition>> {
45        self.partitions
46            .get(partition_id as usize)
47            .cloned()
48            .ok_or(Error::PartitionNotFound(partition_id))
49    }
50
51    /// Append a message to a specific partition
52    pub async fn append(&self, partition_id: u32, message: Message) -> Result<u64> {
53        let partition = self.partition(partition_id)?;
54        partition.append(message).await
55    }
56
57    /// Read messages from a specific partition
58    pub async fn read(
59        &self,
60        partition_id: u32,
61        start_offset: u64,
62        max_messages: usize,
63    ) -> Result<Vec<Message>> {
64        let partition = self.partition(partition_id)?;
65        partition.read(start_offset, max_messages).await
66    }
67
68    /// Get all partitions
69    pub fn all_partitions(&self) -> Vec<Arc<Partition>> {
70        self.partitions.clone()
71    }
72
73    /// Flush all partitions to disk ensuring durability
74    pub async fn flush(&self) -> Result<()> {
75        for partition in &self.partitions {
76            partition.flush().await?;
77        }
78        Ok(())
79    }
80
81    /// Find the first offset with timestamp >= target_timestamp (milliseconds since epoch)
82    /// Returns None if no matching offset is found.
83    pub async fn find_offset_for_timestamp(
84        &self,
85        partition_id: u32,
86        target_timestamp: i64,
87    ) -> Result<Option<u64>> {
88        let partition = self.partition(partition_id)?;
89        partition.find_offset_for_timestamp(target_timestamp).await
90    }
91}
92
93/// Manages all topics in the system
94#[derive(Debug, Clone)]
95pub struct TopicManager {
96    topics: Arc<RwLock<HashMap<String, Arc<Topic>>>>,
97    config: Config,
98}
99
100impl TopicManager {
101    /// Create a new topic manager
102    pub fn new(config: Config) -> Self {
103        info!(
104            "Creating TopicManager with {} default partitions",
105            config.default_partitions
106        );
107
108        Self {
109            topics: Arc::new(RwLock::new(HashMap::new())),
110            config,
111        }
112    }
113
114    /// Create a new topic
115    pub async fn create_topic(
116        &self,
117        name: String,
118        num_partitions: Option<u32>,
119    ) -> Result<Arc<Topic>> {
120        let mut topics = self.topics.write().await;
121
122        if topics.contains_key(&name) {
123            return Err(Error::Other(format!("Topic '{}' already exists", name)));
124        }
125
126        let num_partitions = num_partitions.unwrap_or(self.config.default_partitions);
127        let topic = Arc::new(Topic::new(&self.config, name.clone(), num_partitions).await?);
128
129        topics.insert(name, topic.clone());
130        Ok(topic)
131    }
132
133    /// Get a topic by name
134    pub async fn get_topic(&self, name: &str) -> Result<Arc<Topic>> {
135        let topics = self.topics.read().await;
136        topics
137            .get(name)
138            .cloned()
139            .ok_or_else(|| Error::TopicNotFound(name.to_string()))
140    }
141
142    /// Get or create a topic
143    pub async fn get_or_create_topic(&self, name: String) -> Result<Arc<Topic>> {
144        {
145            let topics = self.topics.read().await;
146            if let Some(topic) = topics.get(&name) {
147                return Ok(topic.clone());
148            }
149        }
150
151        self.create_topic(name, None).await
152    }
153
154    /// List all topics
155    pub async fn list_topics(&self) -> Vec<String> {
156        let topics = self.topics.read().await;
157        topics.keys().cloned().collect()
158    }
159
160    /// Delete a topic
161    pub async fn delete_topic(&self, name: &str) -> Result<()> {
162        let mut topics = self.topics.write().await;
163        topics
164            .remove(name)
165            .ok_or_else(|| Error::TopicNotFound(name.to_string()))?;
166
167        info!("Deleted topic '{}'", name);
168        Ok(())
169    }
170
171    /// Flush all topics to disk ensuring durability during shutdown
172    pub async fn flush_all(&self) -> Result<()> {
173        let topics = self.topics.read().await;
174        for (name, topic) in topics.iter() {
175            info!("Flushing topic '{}'...", name);
176            topic.flush().await?;
177        }
178        Ok(())
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use bytes::Bytes;
186
187    fn get_test_config() -> Config {
188        Config {
189            data_dir: format!("/tmp/rivven-test-{}", uuid::Uuid::new_v4()),
190            ..Default::default()
191        }
192    }
193
194    #[tokio::test]
195    async fn test_topic_creation() {
196        let config = get_test_config();
197        let topic = Topic::new(&config, "test-topic".to_string(), 3)
198            .await
199            .unwrap();
200        assert_eq!(topic.name(), "test-topic");
201        assert_eq!(topic.num_partitions(), 3);
202    }
203
204    #[tokio::test]
205    async fn test_topic_append_and_read() {
206        let config = get_test_config();
207        let topic = Topic::new(&config, "test-topic".to_string(), 2)
208            .await
209            .unwrap();
210
211        let msg = Message::new(Bytes::from("test"));
212        let offset = topic.append(0, msg).await.unwrap();
213        assert_eq!(offset, 0);
214
215        let messages = topic.read(0, 0, 10).await.unwrap();
216        assert_eq!(messages.len(), 1);
217    }
218
219    #[tokio::test]
220    async fn test_topic_manager() {
221        let config = get_test_config();
222        let manager = TopicManager::new(config);
223
224        let topic = manager
225            .create_topic("test".to_string(), None)
226            .await
227            .unwrap();
228        assert_eq!(topic.num_partitions(), 3);
229
230        let retrieved = manager.get_topic("test").await.unwrap();
231        assert_eq!(retrieved.name(), "test");
232
233        let topics = manager.list_topics().await;
234        assert_eq!(topics.len(), 1);
235    }
236}