1pub mod client;
7pub mod config;
8pub mod error;
9pub mod protocol;
10pub mod server;
11pub mod messaging;
12
13
14pub use config::BrokerConfig;
16pub use error::{BrokerError, BrokerResult};
17use messaging::{
18 TopicManager, ProducerService, ConsumerService,
19 offset::OffsetManager,
20 consumer::ConsumerMessage
21};
22use kawadb_storage::StorageEngine;
23use server::BrokerServer;
24
25use derive_more::{Deref, DerefMut, From, Into};
26use serde::{Deserialize, Serialize};
27use std::{
28 collections::HashMap,
29 fmt::Debug,
30 net::SocketAddr,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tracing::{debug, info};
35use uuid::Uuid;
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
39pub struct ClientId(pub String);
40
41impl ClientId {
42 pub fn new() -> Self {
44 Self(Uuid::new_v4().to_string())
45 }
46
47 pub fn from_string(id: impl Into<String>) -> Self {
49 Self(id.into())
50 }
51}
52
53impl Default for ClientId {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl std::fmt::Display for ClientId {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0)
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
67pub struct SessionId(pub u64);
68
69impl SessionId {
70 pub fn new() -> Self {
72 use std::sync::atomic::{AtomicU64, Ordering};
73 static COUNTER: AtomicU64 = AtomicU64::new(1);
74 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
75 }
76}
77
78impl Default for SessionId {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl std::fmt::Display for SessionId {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{}", self.0)
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ClientSession {
93 pub session_id: SessionId,
95 pub client_id: ClientId,
97 pub remote_addr: SocketAddr,
99 pub connected_at: Instant,
101 pub last_activity: Instant,
103}
104
105impl ClientSession {
106 pub fn new(client_id: ClientId, remote_addr: SocketAddr) -> Self {
108 let now = Instant::now();
109 Self {
110 session_id: SessionId::new(),
111 client_id,
112 remote_addr,
113 connected_at: now,
114 last_activity: now,
115 }
116 }
117
118 pub fn update_activity(&mut self) {
120 self.last_activity = Instant::now();
121 }
122
123 pub fn duration(&self) -> Duration {
125 Instant::now().duration_since(self.connected_at)
126 }
127}
128
129#[derive(Debug)]
145pub struct MessageBroker {
146 config: BrokerConfig,
148 storage: Arc<StorageEngine>,
150 server: Option<BrokerServer>,
152 topic_manager: Arc<TopicManager>,
154 producer: Arc<ProducerService>,
156 consumer: Arc<ConsumerService>,
158 offset_manager: Arc<OffsetManager>,
160 sessions: Arc<tokio::sync::RwLock<HashMap<SessionId, ClientSession>>>,
162}
163
164impl MessageBroker {
165 pub async fn new(config: BrokerConfig) -> BrokerResult<Self> {
173 let storage_config = kawadb_storage::StorageConfig {
175 data_dir: config.data_dir().clone(),
176 segment_size: config.segment_size(),
177 sync_interval_ms: config.sync_interval_ms(),
178 enable_compression: false,
179 memory_pool_size: 128 * 1024 * 1024, batch_size: 1000,
181 worker_count: None, };
183 let storage = Arc::new(StorageEngine::new(storage_config).await?);
184
185 let offset_path = config.data_dir().join("offsets");
187 let offset_manager = Arc::new(OffsetManager::new(offset_path).await?);
188
189 let topic_manager = Arc::new(TopicManager::new(storage.clone()));
191
192 let producer = Arc::new(ProducerService::new(
194 storage.clone(),
195 topic_manager.clone(),
196 ));
197 let consumer = Arc::new(ConsumerService::new(
198 storage.clone(),
199 topic_manager.clone(),
200 offset_manager.clone(),
201 ));
202
203 info!("MessageBroker initialized with storage: {:?}", config.data_dir());
204
205 Ok(Self {
206 config,
207 storage,
208 server: None,
209 topic_manager,
210 producer,
211 consumer,
212 offset_manager,
213 sessions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
214 })
215 }
216
217 pub async fn start(&mut self) -> BrokerResult<SocketAddr> {
222 let server = BrokerServer::new(
223 self.config.clone(),
224 self.storage.clone(),
225 ).await?;
226
227 let addr = server.start_and_get_addr().await?;
228 self.server = Some(server);
229
230 info!("MessageBroker started on {}", addr);
231 Ok(addr)
232 }
233
234 pub async fn stop(&mut self) -> BrokerResult<()> {
236 if let Some(mut server) = self.server.take() {
237 server.stop().await?;
238 }
239
240 self.storage.shutdown().await?;
242
243 info!("MessageBroker stopped");
244 Ok(())
245 }
246
247 pub async fn produce_message(
258 &self,
259 topic: &str,
260 partition: u32,
261 message: &[u8],
262 session_id: SessionId,
263 ) -> BrokerResult<i64> {
264 self.producer.produce_message(topic, partition, message, session_id).await
265 }
266
267 pub async fn fetch_messages(
279 &self,
280 topic: &str,
281 partition: u32,
282 offset: i64,
283 max_messages: usize,
284 session_id: SessionId,
285 ) -> BrokerResult<Vec<ConsumerMessage>> {
286 self.consumer.fetch_messages(topic, partition, offset, max_messages, session_id).await
287 }
288
289 pub async fn create_topic(
299 &self,
300 topic: &str,
301 partition_count: u32,
302 replication_factor: u16,
303 ) -> BrokerResult<()> {
304 self.topic_manager.create_topic(topic.to_string(), partition_count, replication_factor).await
305 }
306
307 pub async fn delete_topic(&self, topic: &str) -> BrokerResult<()> {
315 self.topic_manager.delete_topic(topic).await
316 }
317
318 pub async fn list_topics(&self) -> BrokerResult<Vec<String>> {
323 let topics = self.topic_manager.list_topics().await?;
324 Ok(topics.into_iter().map(|t| t.name).collect())
325 }
326
327 pub async fn create_consumer_group(&self, group_id: &str, protocol: &str) -> BrokerResult<()> {
336 self.offset_manager.create_group(group_id, protocol).await
337 }
338
339 pub async fn get_stats(&self) -> BrokerResult<BrokerStats> {
344 let offset_stats = self.offset_manager.get_stats().await?;
345 let topics = self.topic_manager.list_topics().await?;
346 let sessions = self.sessions.read().await;
347
348 let stats = BrokerStats {
349 active_sessions: sessions.len(),
350 total_topics: topics.len(),
351 total_consumer_groups: offset_stats.total_groups,
352 active_consumer_groups: offset_stats.active_groups,
353 total_offsets: offset_stats.total_offsets,
354 uptime_seconds: 0, };
356
357 Ok(stats)
358 }
359
360 pub async fn register_session(&self, session: ClientSession) {
362 let session_id = session.session_id;
363 let mut sessions = self.sessions.write().await;
364 sessions.insert(session_id, session);
365 debug!("Session registered: {}", session_id);
366 }
367
368 pub async fn unregister_session(&self, session_id: SessionId) {
370 let mut sessions = self.sessions.write().await;
371 if sessions.remove(&session_id).is_some() {
372 debug!("Session unregistered: {}", session_id);
373 }
374 }
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize)]
379pub struct BrokerStats {
380 pub active_sessions: usize,
382 pub total_topics: usize,
384 pub total_consumer_groups: usize,
386 pub active_consumer_groups: usize,
388 pub total_offsets: usize,
390 pub uptime_seconds: u64,
392}