kawadb_broker/
lib.rs

1//! # Kawa Broker
2//!
3//! Kafka互換メッセージブローカーの実装。
4//! 高性能なイベントストリーミングプラットフォームを提供。
5
6pub mod client;
7pub mod config;
8pub mod error;
9pub mod protocol;
10pub mod server;
11pub mod messaging;
12
13
14// 公開エクスポート
15pub use config::BrokerConfig;
16pub use error::{BrokerError, BrokerResult};
17use messaging::{
18    TopicManager, ProducerService, ConsumerService, 
19    offset::OffsetManager,
20    consumer::ConsumerMessage
21};
22use kawadb_storage::StorageEngine;
23use server::BrokerServer;
24
25use derive_more::{Deref, DerefMut, From, Into};
26use serde::{Deserialize, Serialize};
27use std::{
28    collections::HashMap,
29    fmt::Debug,
30    net::SocketAddr,
31    sync::Arc,
32    time::{Duration, Instant},
33};
34use tracing::{debug, info};
35use uuid::Uuid;
36
37/// クライアントID型(Newtypeパターン)
38#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
39pub struct ClientId(pub String);
40
41impl ClientId {
42    /// 新しいクライアントIDを生成
43    pub fn new() -> Self {
44        Self(Uuid::new_v4().to_string())
45    }
46    
47    /// 文字列からクライアントIDを作成
48    pub fn from_string(id: impl Into<String>) -> Self {
49        Self(id.into())
50    }
51}
52
53impl Default for ClientId {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl std::fmt::Display for ClientId {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0)
62    }
63}
64
65/// セッションID型(Newtypeパターン)
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
67pub struct SessionId(pub u64);
68
69impl SessionId {
70    /// 新しいセッションIDを生成
71    pub fn new() -> Self {
72        use std::sync::atomic::{AtomicU64, Ordering};
73        static COUNTER: AtomicU64 = AtomicU64::new(1);
74        Self(COUNTER.fetch_add(1, Ordering::Relaxed))
75    }
76}
77
78impl Default for SessionId {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl std::fmt::Display for SessionId {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{}", self.0)
87    }
88}
89
90/// クライアントセッション情報
91#[derive(Debug, Clone)]
92pub struct ClientSession {
93    /// セッションID
94    pub session_id: SessionId,
95    /// クライアントID
96    pub client_id: ClientId,
97    /// リモートアドレス
98    pub remote_addr: SocketAddr,
99    /// 接続時刻
100    pub connected_at: Instant,
101    /// 最終アクティビティ時刻
102    pub last_activity: Instant,
103}
104
105impl ClientSession {
106    /// 新しいクライアントセッションを作成
107    pub fn new(client_id: ClientId, remote_addr: SocketAddr) -> Self {
108        let now = Instant::now();
109        Self {
110            session_id: SessionId::new(),
111            client_id,
112            remote_addr,
113            connected_at: now,
114            last_activity: now,
115        }
116    }
117    
118    /// 最終アクティビティ時刻を更新
119    pub fn update_activity(&mut self) {
120        self.last_activity = Instant::now();
121    }
122    
123    /// セッションの持続時間を取得
124    pub fn duration(&self) -> Duration {
125        Instant::now().duration_since(self.connected_at)
126    }
127}
128
129/// Kawa メッセージブローカー
130/// 
131/// Kafka互換のメッセージブローカーサービスを提供。
132/// Producer/Consumer API、トピック管理、オフセット管理を統合。
133/// 
134/// # 機能
135/// - Kafka互換プロトコル処理
136/// - 高性能メッセージ永続化
137/// - コンシューマーグループ管理
138/// - メトリクス・モニタリング
139/// 
140/// # @todo
141/// - [ ] クラスター機能
142/// - [ ] レプリケーション
143/// - [ ] 管理API
144#[derive(Debug)]
145pub struct MessageBroker {
146    /// ブローカー設定
147    config: BrokerConfig,
148    /// ストレージエンジン
149    storage: Arc<StorageEngine>,
150    /// TCPサーバー
151    server: Option<BrokerServer>,
152    /// トピック管理
153    topic_manager: Arc<TopicManager>,
154    /// Producer サービス
155    producer: Arc<ProducerService>,
156    /// Consumer サービス
157    consumer: Arc<ConsumerService>,
158    /// オフセット管理
159    offset_manager: Arc<OffsetManager>,
160    /// アクティブセッション
161    sessions: Arc<tokio::sync::RwLock<HashMap<SessionId, ClientSession>>>,
162}
163
164impl MessageBroker {
165    /// 新しいメッセージブローカーを作成
166    /// 
167    /// # Arguments
168    /// * `config` - ブローカー設定
169    /// 
170    /// # Returns
171    /// * `Result<Self>` - ブローカーインスタンス
172    pub async fn new(config: BrokerConfig) -> BrokerResult<Self> {
173        // ストレージエンジンを初期化
174        let storage_config = kawadb_storage::StorageConfig {
175            data_dir: config.data_dir().clone(),
176            segment_size: config.segment_size(),
177            sync_interval_ms: config.sync_interval_ms(),
178            enable_compression: false,
179            memory_pool_size: 128 * 1024 * 1024, // 128MB
180            batch_size: 1000,
181            worker_count: None, // 自動設定
182        };
183        let storage = Arc::new(StorageEngine::new(storage_config).await?);
184        
185        // オフセット管理を初期化
186        let offset_path = config.data_dir().join("offsets");
187        let offset_manager = Arc::new(OffsetManager::new(offset_path).await?);
188        
189        // トピック管理を初期化
190        let topic_manager = Arc::new(TopicManager::new(storage.clone()));
191        
192        // Producer/Consumer サービスを初期化
193        let producer = Arc::new(ProducerService::new(
194            storage.clone(),
195            topic_manager.clone(),
196        ));
197        let consumer = Arc::new(ConsumerService::new(
198            storage.clone(),
199            topic_manager.clone(),
200            offset_manager.clone(),
201        ));
202        
203        info!("MessageBroker initialized with storage: {:?}", config.data_dir());
204        
205        Ok(Self {
206            config,
207            storage,
208            server: None,
209            topic_manager,
210            producer,
211            consumer,
212            offset_manager,
213            sessions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
214        })
215    }
216    
217    /// ブローカーを開始
218    /// 
219    /// # Returns
220    /// * `Result<SocketAddr>` - 実際のバインドアドレス
221    pub async fn start(&mut self) -> BrokerResult<SocketAddr> {
222        let server = BrokerServer::new(
223            self.config.clone(),
224            self.storage.clone(),
225        ).await?;
226        
227        let addr = server.start_and_get_addr().await?;
228        self.server = Some(server);
229        
230        info!("MessageBroker started on {}", addr);
231        Ok(addr)
232    }
233    
234    /// ブローカーを停止
235    pub async fn stop(&mut self) -> BrokerResult<()> {
236        if let Some(mut server) = self.server.take() {
237            server.stop().await?;
238        }
239        
240        // ストレージを優雅にシャットダウン
241        self.storage.shutdown().await?;
242        
243        info!("MessageBroker stopped");
244        Ok(())
245    }
246    
247    /// メッセージを送信(Producer API)
248    /// 
249    /// # Arguments
250    /// * `topic` - トピック名
251    /// * `partition` - パーティション番号
252    /// * `message` - メッセージデータ
253    /// * `session_id` - セッションID
254    /// 
255    /// # Returns
256    /// * `BrokerResult<i64>` - 割り当てられたオフセット
257    pub async fn produce_message(
258        &self,
259        topic: &str,
260        partition: u32,
261        message: &[u8],
262        session_id: SessionId,
263    ) -> BrokerResult<i64> {
264        self.producer.produce_message(topic, partition, message, session_id).await
265    }
266    
267    /// メッセージを取得(Consumer API)
268    /// 
269    /// # Arguments
270    /// * `topic` - トピック名
271    /// * `partition` - パーティション番号
272    /// * `offset` - 開始オフセット
273    /// * `max_messages` - 最大取得メッセージ数
274    /// * `session_id` - セッションID
275    /// 
276    /// # Returns
277    /// * `BrokerResult<Vec<ConsumerMessage>>` - 取得したメッセージリスト
278    pub async fn fetch_messages(
279        &self,
280        topic: &str,
281        partition: u32,
282        offset: i64,
283        max_messages: usize,
284        session_id: SessionId,
285    ) -> BrokerResult<Vec<ConsumerMessage>> {
286        self.consumer.fetch_messages(topic, partition, offset, max_messages, session_id).await
287    }
288    
289    /// トピックを作成
290    /// 
291    /// # Arguments
292    /// * `topic` - トピック名
293    /// * `partition_count` - パーティション数
294    /// * `replication_factor` - レプリケーション係数
295    /// 
296    /// # Returns
297    /// * `BrokerResult<()>` - 成功または失敗
298    pub async fn create_topic(
299        &self,
300        topic: &str,
301        partition_count: u32,
302        replication_factor: u16,
303    ) -> BrokerResult<()> {
304        self.topic_manager.create_topic(topic.to_string(), partition_count, replication_factor).await
305    }
306    
307    /// トピックを削除
308    /// 
309    /// # Arguments
310    /// * `topic` - トピック名
311    /// 
312    /// # Returns
313    /// * `BrokerResult<()>` - 成功または失敗
314    pub async fn delete_topic(&self, topic: &str) -> BrokerResult<()> {
315        self.topic_manager.delete_topic(topic).await
316    }
317    
318    /// トピック一覧を取得
319    /// 
320    /// # Returns
321    /// * `BrokerResult<Vec<String>>` - トピック名一覧
322    pub async fn list_topics(&self) -> BrokerResult<Vec<String>> {
323        let topics = self.topic_manager.list_topics().await?;
324        Ok(topics.into_iter().map(|t| t.name).collect())
325    }
326    
327    /// コンシューマーグループを作成
328    /// 
329    /// # Arguments
330    /// * `group_id` - グループID
331    /// * `protocol` - プロトコル名
332    /// 
333    /// # Returns
334    /// * `BrokerResult<()>` - 成功または失敗
335    pub async fn create_consumer_group(&self, group_id: &str, protocol: &str) -> BrokerResult<()> {
336        self.offset_manager.create_group(group_id, protocol).await
337    }
338    
339    /// ブローカー統計を取得
340    /// 
341    /// # Returns
342    /// * `BrokerResult<BrokerStats>` - ブローカー統計
343    pub async fn get_stats(&self) -> BrokerResult<BrokerStats> {
344        let offset_stats = self.offset_manager.get_stats().await?;
345        let topics = self.topic_manager.list_topics().await?;
346        let sessions = self.sessions.read().await;
347        
348        let stats = BrokerStats {
349            active_sessions: sessions.len(),
350            total_topics: topics.len(),
351            total_consumer_groups: offset_stats.total_groups,
352            active_consumer_groups: offset_stats.active_groups,
353            total_offsets: offset_stats.total_offsets,
354            uptime_seconds: 0, // @todo: 実装
355        };
356        
357        Ok(stats)
358    }
359    
360    /// クライアントセッションを登録
361    pub async fn register_session(&self, session: ClientSession) {
362        let session_id = session.session_id;
363        let mut sessions = self.sessions.write().await;
364        sessions.insert(session_id, session);
365        debug!("Session registered: {}", session_id);
366    }
367    
368    /// クライアントセッションを削除
369    pub async fn unregister_session(&self, session_id: SessionId) {
370        let mut sessions = self.sessions.write().await;
371        if sessions.remove(&session_id).is_some() {
372            debug!("Session unregistered: {}", session_id);
373        }
374    }
375}
376
377/// ブローカー統計
378#[derive(Debug, Clone, Serialize, Deserialize)]
379pub struct BrokerStats {
380    /// アクティブセッション数
381    pub active_sessions: usize,
382    /// 総トピック数
383    pub total_topics: usize,
384    /// 総コンシューマーグループ数
385    pub total_consumer_groups: usize,
386    /// アクティブコンシューマーグループ数
387    pub active_consumer_groups: usize,
388    /// 総オフセット数
389    pub total_offsets: usize,
390    /// 稼働時間(秒)
391    pub uptime_seconds: u64,
392}