1use crate::{Config, Error, Message, Partition, Result};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tracing::info;
6
7#[derive(Debug)]
9pub struct Topic {
10 name: String,
12
13 partitions: Vec<Arc<Partition>>,
15}
16
17impl Topic {
18 pub async fn new(config: &Config, name: String, num_partitions: u32) -> Result<Self> {
20 info!(
21 "Creating topic '{}' with {} partitions",
22 name, num_partitions
23 );
24
25 let mut partitions = Vec::new();
26 for id in 0..num_partitions {
27 partitions.push(Arc::new(Partition::new(config, &name, id).await?));
28 }
29
30 Ok(Self { name, partitions })
31 }
32
33 pub fn name(&self) -> &str {
35 &self.name
36 }
37
38 pub fn num_partitions(&self) -> usize {
40 self.partitions.len()
41 }
42
43 pub fn partition(&self, partition_id: u32) -> Result<Arc<Partition>> {
45 self.partitions
46 .get(partition_id as usize)
47 .cloned()
48 .ok_or(Error::PartitionNotFound(partition_id))
49 }
50
51 pub async fn append(&self, partition_id: u32, message: Message) -> Result<u64> {
53 let partition = self.partition(partition_id)?;
54 partition.append(message).await
55 }
56
57 pub async fn read(
59 &self,
60 partition_id: u32,
61 start_offset: u64,
62 max_messages: usize,
63 ) -> Result<Vec<Message>> {
64 let partition = self.partition(partition_id)?;
65 partition.read(start_offset, max_messages).await
66 }
67
68 pub fn all_partitions(&self) -> Vec<Arc<Partition>> {
70 self.partitions.clone()
71 }
72
73 pub async fn flush(&self) -> Result<()> {
75 for partition in &self.partitions {
76 partition.flush().await?;
77 }
78 Ok(())
79 }
80
81 pub async fn find_offset_for_timestamp(
84 &self,
85 partition_id: u32,
86 target_timestamp: i64,
87 ) -> Result<Option<u64>> {
88 let partition = self.partition(partition_id)?;
89 partition.find_offset_for_timestamp(target_timestamp).await
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct TopicManager {
96 topics: Arc<RwLock<HashMap<String, Arc<Topic>>>>,
97 config: Config,
98}
99
100impl TopicManager {
101 pub fn new(config: Config) -> Self {
103 info!(
104 "Creating TopicManager with {} default partitions",
105 config.default_partitions
106 );
107
108 Self {
109 topics: Arc::new(RwLock::new(HashMap::new())),
110 config,
111 }
112 }
113
114 pub async fn create_topic(
116 &self,
117 name: String,
118 num_partitions: Option<u32>,
119 ) -> Result<Arc<Topic>> {
120 let mut topics = self.topics.write().await;
121
122 if topics.contains_key(&name) {
123 return Err(Error::Other(format!("Topic '{}' already exists", name)));
124 }
125
126 let num_partitions = num_partitions.unwrap_or(self.config.default_partitions);
127 let topic = Arc::new(Topic::new(&self.config, name.clone(), num_partitions).await?);
128
129 topics.insert(name, topic.clone());
130 Ok(topic)
131 }
132
133 pub async fn get_topic(&self, name: &str) -> Result<Arc<Topic>> {
135 let topics = self.topics.read().await;
136 topics
137 .get(name)
138 .cloned()
139 .ok_or_else(|| Error::TopicNotFound(name.to_string()))
140 }
141
142 pub async fn get_or_create_topic(&self, name: String) -> Result<Arc<Topic>> {
144 {
145 let topics = self.topics.read().await;
146 if let Some(topic) = topics.get(&name) {
147 return Ok(topic.clone());
148 }
149 }
150
151 self.create_topic(name, None).await
152 }
153
154 pub async fn list_topics(&self) -> Vec<String> {
156 let topics = self.topics.read().await;
157 topics.keys().cloned().collect()
158 }
159
160 pub async fn delete_topic(&self, name: &str) -> Result<()> {
162 let mut topics = self.topics.write().await;
163 topics
164 .remove(name)
165 .ok_or_else(|| Error::TopicNotFound(name.to_string()))?;
166
167 info!("Deleted topic '{}'", name);
168 Ok(())
169 }
170
171 pub async fn flush_all(&self) -> Result<()> {
173 let topics = self.topics.read().await;
174 for (name, topic) in topics.iter() {
175 info!("Flushing topic '{}'...", name);
176 topic.flush().await?;
177 }
178 Ok(())
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use bytes::Bytes;
186
187 fn get_test_config() -> Config {
188 Config {
189 data_dir: format!("/tmp/rivven-test-{}", uuid::Uuid::new_v4()),
190 ..Default::default()
191 }
192 }
193
194 #[tokio::test]
195 async fn test_topic_creation() {
196 let config = get_test_config();
197 let topic = Topic::new(&config, "test-topic".to_string(), 3)
198 .await
199 .unwrap();
200 assert_eq!(topic.name(), "test-topic");
201 assert_eq!(topic.num_partitions(), 3);
202 }
203
204 #[tokio::test]
205 async fn test_topic_append_and_read() {
206 let config = get_test_config();
207 let topic = Topic::new(&config, "test-topic".to_string(), 2)
208 .await
209 .unwrap();
210
211 let msg = Message::new(Bytes::from("test"));
212 let offset = topic.append(0, msg).await.unwrap();
213 assert_eq!(offset, 0);
214
215 let messages = topic.read(0, 0, 10).await.unwrap();
216 assert_eq!(messages.len(), 1);
217 }
218
219 #[tokio::test]
220 async fn test_topic_manager() {
221 let config = get_test_config();
222 let manager = TopicManager::new(config);
223
224 let topic = manager
225 .create_topic("test".to_string(), None)
226 .await
227 .unwrap();
228 assert_eq!(topic.num_partitions(), 3);
229
230 let retrieved = manager.get_topic("test").await.unwrap();
231 assert_eq!(retrieved.name(), "test");
232
233 let topics = manager.list_topics().await;
234 assert_eq!(topics.len(), 1);
235 }
236}