1use crate::RuleEngine;
2use anyhow::{Context, Result};
3use arrow_json::ReaderBuilder;
4use std::io::Cursor;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::RwLock;
8use tokio_tungstenite::WebSocketStream;
9use tracing::{debug, error, info};
10
11pub type SharedEngine = Arc<RwLock<RuleEngine>>;
12
13pub struct KafkaIngestion {
15 engine: SharedEngine,
16 brokers: Vec<String>,
17 topic: String,
18 group_id: String,
19 auto_commit: bool,
20}
21
22impl KafkaIngestion {
23 pub fn new(
24 engine: SharedEngine,
25 brokers: Vec<String>,
26 topic: String,
27 group_id: String,
28 auto_commit: bool,
29 ) -> Self {
30 Self {
31 engine,
32 brokers,
33 topic,
34 group_id,
35 auto_commit,
36 }
37 }
38
39 pub async fn run(&self) -> Result<()> {
40 use futures::StreamExt;
41 use rdkafka::config::ClientConfig;
42 use rdkafka::consumer::{Consumer, StreamConsumer};
43 use rdkafka::Message;
44
45 info!(
46 brokers = ?self.brokers,
47 topic = %self.topic,
48 group_id = %self.group_id,
49 "Starting Kafka consumer"
50 );
51
52 let consumer: StreamConsumer = ClientConfig::new()
54 .set("bootstrap.servers", self.brokers.join(","))
55 .set("group.id", &self.group_id)
56 .set("enable.partition.eof", "false")
57 .set("session.timeout.ms", "6000")
58 .set(
59 "enable.auto.commit",
60 if self.auto_commit { "true" } else { "false" },
61 )
62 .set("auto.offset.reset", "earliest")
63 .create()
64 .context("Failed to create Kafka consumer")?;
65
66 consumer
67 .subscribe(&[&self.topic])
68 .context("Failed to subscribe to Kafka topic")?;
69
70 info!("Kafka consumer subscribed to topic: {}", self.topic);
71
72 let mut message_stream = consumer.stream();
74 while let Some(message_result) = message_stream.next().await {
75 match message_result {
76 Ok(message) => {
77 if let Some(payload) = message.payload() {
78 match self.process_message(payload).await {
79 Ok(_) => {
80 debug!(
81 partition = ?message.partition(),
82 offset = ?message.offset(),
83 "Processed Kafka message"
84 );
85 }
86 Err(e) => {
87 error!(
88 error = %e,
89 partition = ?message.partition(),
90 offset = ?message.offset(),
91 "Failed to process Kafka message"
92 );
93 }
94 }
95 }
96 }
97 Err(e) => {
98 error!(error = %e, "Kafka message error");
99 }
100 }
101 }
102
103 Ok(())
104 }
105
106 async fn process_message(&self, payload: &[u8]) -> Result<()> {
107 let json_value: serde_json::Value =
109 serde_json::from_slice(payload).context("Failed to parse Kafka message as JSON")?;
110
111 let json_data = serde_json::to_vec(&json_value)?;
113 let cursor = Cursor::new(json_data);
114
115 let engine_lock = self.engine.read().await;
116 let schema = engine_lock.schema();
117 drop(engine_lock);
118
119 let reader = ReaderBuilder::new(schema.clone())
120 .build(cursor)
121 .context("Failed to create JSON reader")?;
122
123 for batch_result in reader {
125 match batch_result {
126 Ok(batch) => {
127 let mut engine_lock = self.engine.write().await;
128 match engine_lock.process_batch(&batch).await {
129 Ok(_traces) => {
130 debug!(rows = batch.num_rows(), "Processed batch from Kafka");
131 }
132 Err(e) => {
133 error!(error = %e, "Failed to process batch from Kafka");
134 }
135 }
136 }
137 Err(e) => {
138 error!(error = %e, "Failed to read batch from Kafka message");
139 }
140 }
141 }
142
143 Ok(())
144 }
145}
146
147pub struct WebSocketIngestion {
149 engine: SharedEngine,
150 bind: String,
151 max_connections: usize,
152}
153
154impl WebSocketIngestion {
155 pub fn new(engine: SharedEngine, bind: String, max_connections: usize) -> Self {
156 Self {
157 engine,
158 bind,
159 max_connections,
160 }
161 }
162
163 pub async fn run(&self) -> Result<()> {
164 use tokio::net::TcpListener;
165 use tokio_tungstenite::accept_async;
166
167 info!(
168 bind = %self.bind,
169 max_connections = self.max_connections,
170 "Starting WebSocket server"
171 );
172
173 let listener = TcpListener::bind(&self.bind).await?;
174 info!("WebSocket server listening on {}", self.bind);
175
176 while let Ok((stream, addr)) = listener.accept().await {
177 let engine = self.engine.clone();
178 tokio::spawn(async move {
179 info!(client = %addr, "New WebSocket connection");
180 match accept_async(stream).await {
181 Ok(ws_stream) => {
182 if let Err(e) = handle_websocket_stream(ws_stream, engine).await {
183 error!(error = %e, "WebSocket handler error");
184 }
185 }
186 Err(e) => {
187 error!(error = %e, "Failed to accept WebSocket connection");
188 }
189 }
190 });
191 }
192
193 Ok(())
194 }
195}
196
197async fn handle_websocket_stream(
198 stream: WebSocketStream<TcpStream>,
199 engine: SharedEngine,
200) -> Result<()> {
201 use arrow_json::ReaderBuilder;
202 use futures::{SinkExt, StreamExt};
203 use std::io::Cursor;
204 use tokio_tungstenite::tungstenite::Message;
205
206 let (mut sender, mut receiver) = stream.split();
207
208 while let Some(msg) = receiver.next().await {
209 match msg {
210 Ok(Message::Text(text)) => {
211 match serde_json::from_str::<serde_json::Value>(&text) {
213 Ok(json_value) => {
214 let json_data = match serde_json::to_vec(&json_value) {
216 Ok(data) => data,
217 Err(e) => {
218 error!(error = %e, "Failed to serialize JSON");
219 continue;
220 }
221 };
222 let cursor = Cursor::new(json_data);
223
224 let engine_lock = engine.read().await;
225 let schema = engine_lock.schema();
226 drop(engine_lock);
227
228 let reader = match ReaderBuilder::new(schema.clone()).build(cursor) {
229 Ok(r) => r,
230 Err(e) => {
231 error!(error = %e, "Failed to create JSON reader");
232 continue;
233 }
234 };
235
236 for batch_result in reader {
238 match batch_result {
239 Ok(batch) => {
240 let mut engine_lock = engine.write().await;
241 match engine_lock.process_batch(&batch).await {
242 Ok(_traces) => {
243 debug!(
244 rows = batch.num_rows(),
245 "Processed batch from WebSocket"
246 );
247 }
248 Err(e) => {
249 error!(error = %e, "Failed to process batch from WebSocket");
250 }
251 }
252 }
253 Err(e) => {
254 error!(error = %e, "Failed to read batch from WebSocket message");
255 }
256 }
257 }
258 }
259 Err(e) => {
260 error!(error = %e, "Failed to parse WebSocket message as JSON");
261 }
262 }
263 }
264 Ok(Message::Binary(data)) => {
265 match serde_json::from_slice::<serde_json::Value>(&data) {
267 Ok(json_value) => {
268 let json_data = match serde_json::to_vec(&json_value) {
270 Ok(data) => data,
271 Err(e) => {
272 error!(error = %e, "Failed to serialize JSON");
273 continue;
274 }
275 };
276 let cursor = Cursor::new(json_data);
277
278 let engine_lock = engine.read().await;
279 let schema = engine_lock.schema();
280 drop(engine_lock);
281
282 let reader = match ReaderBuilder::new(schema.clone()).build(cursor) {
283 Ok(r) => r,
284 Err(e) => {
285 error!(error = %e, "Failed to create JSON reader");
286 continue;
287 }
288 };
289
290 for batch_result in reader {
291 match batch_result {
292 Ok(batch) => {
293 let mut engine_lock = engine.write().await;
294 if let Err(e) = engine_lock.process_batch(&batch).await {
295 error!(error = %e, "Failed to process batch from WebSocket");
296 }
297 }
298 Err(e) => {
299 error!(error = %e, "Failed to read batch from WebSocket message");
300 }
301 }
302 }
303 }
304 Err(e) => {
305 error!(error = %e, "Failed to parse WebSocket binary as JSON");
306 }
307 }
308 }
309 Ok(Message::Close(_)) => {
310 info!("WebSocket connection closed");
311 break;
312 }
313 Ok(Message::Ping(data)) => {
314 if sender.send(Message::Pong(data)).await.is_err() {
316 break;
317 }
318 }
319 Ok(Message::Pong(_)) => {
320 }
322 Ok(Message::Frame(_)) => {
323 }
325 Err(e) => {
326 error!(error = %e, "WebSocket error");
327 break;
328 }
329 }
330 }
331
332 info!("WebSocket connection ended");
333 Ok(())
334}