fuse_rule/
ingestion.rs

1use crate::RuleEngine;
2use anyhow::{Context, Result};
3use arrow_json::ReaderBuilder;
4use std::io::Cursor;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::RwLock;
8use tokio_tungstenite::WebSocketStream;
9use tracing::{debug, error, info};
10
11pub type SharedEngine = Arc<RwLock<RuleEngine>>;
12
13/// Kafka consumer for ingesting events
14pub struct KafkaIngestion {
15    engine: SharedEngine,
16    brokers: Vec<String>,
17    topic: String,
18    group_id: String,
19    auto_commit: bool,
20}
21
22impl KafkaIngestion {
23    pub fn new(
24        engine: SharedEngine,
25        brokers: Vec<String>,
26        topic: String,
27        group_id: String,
28        auto_commit: bool,
29    ) -> Self {
30        Self {
31            engine,
32            brokers,
33            topic,
34            group_id,
35            auto_commit,
36        }
37    }
38
39    pub async fn run(&self) -> Result<()> {
40        use futures::StreamExt;
41        use rdkafka::config::ClientConfig;
42        use rdkafka::consumer::{Consumer, StreamConsumer};
43        use rdkafka::Message;
44
45        info!(
46            brokers = ?self.brokers,
47            topic = %self.topic,
48            group_id = %self.group_id,
49            "Starting Kafka consumer"
50        );
51
52        // Create Kafka consumer
53        let consumer: StreamConsumer = ClientConfig::new()
54            .set("bootstrap.servers", self.brokers.join(","))
55            .set("group.id", &self.group_id)
56            .set("enable.partition.eof", "false")
57            .set("session.timeout.ms", "6000")
58            .set(
59                "enable.auto.commit",
60                if self.auto_commit { "true" } else { "false" },
61            )
62            .set("auto.offset.reset", "earliest")
63            .create()
64            .context("Failed to create Kafka consumer")?;
65
66        consumer
67            .subscribe(&[&self.topic])
68            .context("Failed to subscribe to Kafka topic")?;
69
70        info!("Kafka consumer subscribed to topic: {}", self.topic);
71
72        // Process messages
73        let mut message_stream = consumer.stream();
74        while let Some(message_result) = message_stream.next().await {
75            match message_result {
76                Ok(message) => {
77                    if let Some(payload) = message.payload() {
78                        match self.process_message(payload).await {
79                            Ok(_) => {
80                                debug!(
81                                    partition = ?message.partition(),
82                                    offset = ?message.offset(),
83                                    "Processed Kafka message"
84                                );
85                            }
86                            Err(e) => {
87                                error!(
88                                    error = %e,
89                                    partition = ?message.partition(),
90                                    offset = ?message.offset(),
91                                    "Failed to process Kafka message"
92                                );
93                            }
94                        }
95                    }
96                }
97                Err(e) => {
98                    error!(error = %e, "Kafka message error");
99                }
100            }
101        }
102
103        Ok(())
104    }
105
106    async fn process_message(&self, payload: &[u8]) -> Result<()> {
107        // Try to parse as JSON
108        let json_value: serde_json::Value =
109            serde_json::from_slice(payload).context("Failed to parse Kafka message as JSON")?;
110
111        // Convert to RecordBatch
112        let json_data = serde_json::to_vec(&json_value)?;
113        let cursor = Cursor::new(json_data);
114
115        let engine_lock = self.engine.read().await;
116        let schema = engine_lock.schema();
117        drop(engine_lock);
118
119        let reader = ReaderBuilder::new(schema.clone())
120            .build(cursor)
121            .context("Failed to create JSON reader")?;
122
123        // Process all batches from the reader
124        for batch_result in reader {
125            match batch_result {
126                Ok(batch) => {
127                    let mut engine_lock = self.engine.write().await;
128                    match engine_lock.process_batch(&batch).await {
129                        Ok(_traces) => {
130                            debug!(rows = batch.num_rows(), "Processed batch from Kafka");
131                        }
132                        Err(e) => {
133                            error!(error = %e, "Failed to process batch from Kafka");
134                        }
135                    }
136                }
137                Err(e) => {
138                    error!(error = %e, "Failed to read batch from Kafka message");
139                }
140            }
141        }
142
143        Ok(())
144    }
145}
146
147/// WebSocket server for ingesting events
148pub struct WebSocketIngestion {
149    engine: SharedEngine,
150    bind: String,
151    max_connections: usize,
152}
153
154impl WebSocketIngestion {
155    pub fn new(engine: SharedEngine, bind: String, max_connections: usize) -> Self {
156        Self {
157            engine,
158            bind,
159            max_connections,
160        }
161    }
162
163    pub async fn run(&self) -> Result<()> {
164        use tokio::net::TcpListener;
165        use tokio_tungstenite::accept_async;
166
167        info!(
168            bind = %self.bind,
169            max_connections = self.max_connections,
170            "Starting WebSocket server"
171        );
172
173        let listener = TcpListener::bind(&self.bind).await?;
174        info!("WebSocket server listening on {}", self.bind);
175
176        while let Ok((stream, addr)) = listener.accept().await {
177            let engine = self.engine.clone();
178            tokio::spawn(async move {
179                info!(client = %addr, "New WebSocket connection");
180                match accept_async(stream).await {
181                    Ok(ws_stream) => {
182                        if let Err(e) = handle_websocket_stream(ws_stream, engine).await {
183                            error!(error = %e, "WebSocket handler error");
184                        }
185                    }
186                    Err(e) => {
187                        error!(error = %e, "Failed to accept WebSocket connection");
188                    }
189                }
190            });
191        }
192
193        Ok(())
194    }
195}
196
197async fn handle_websocket_stream(
198    stream: WebSocketStream<TcpStream>,
199    engine: SharedEngine,
200) -> Result<()> {
201    use arrow_json::ReaderBuilder;
202    use futures::{SinkExt, StreamExt};
203    use std::io::Cursor;
204    use tokio_tungstenite::tungstenite::Message;
205
206    let (mut sender, mut receiver) = stream.split();
207
208    while let Some(msg) = receiver.next().await {
209        match msg {
210            Ok(Message::Text(text)) => {
211                // Try to parse as JSON
212                match serde_json::from_str::<serde_json::Value>(&text) {
213                    Ok(json_value) => {
214                        // Convert to RecordBatch
215                        let json_data = match serde_json::to_vec(&json_value) {
216                            Ok(data) => data,
217                            Err(e) => {
218                                error!(error = %e, "Failed to serialize JSON");
219                                continue;
220                            }
221                        };
222                        let cursor = Cursor::new(json_data);
223
224                        let engine_lock = engine.read().await;
225                        let schema = engine_lock.schema();
226                        drop(engine_lock);
227
228                        let reader = match ReaderBuilder::new(schema.clone()).build(cursor) {
229                            Ok(r) => r,
230                            Err(e) => {
231                                error!(error = %e, "Failed to create JSON reader");
232                                continue;
233                            }
234                        };
235
236                        // Process all batches
237                        for batch_result in reader {
238                            match batch_result {
239                                Ok(batch) => {
240                                    let mut engine_lock = engine.write().await;
241                                    match engine_lock.process_batch(&batch).await {
242                                        Ok(_traces) => {
243                                            debug!(
244                                                rows = batch.num_rows(),
245                                                "Processed batch from WebSocket"
246                                            );
247                                        }
248                                        Err(e) => {
249                                            error!(error = %e, "Failed to process batch from WebSocket");
250                                        }
251                                    }
252                                }
253                                Err(e) => {
254                                    error!(error = %e, "Failed to read batch from WebSocket message");
255                                }
256                            }
257                        }
258                    }
259                    Err(e) => {
260                        error!(error = %e, "Failed to parse WebSocket message as JSON");
261                    }
262                }
263            }
264            Ok(Message::Binary(data)) => {
265                // Try to parse binary as JSON
266                match serde_json::from_slice::<serde_json::Value>(&data) {
267                    Ok(json_value) => {
268                        // Same processing as text
269                        let json_data = match serde_json::to_vec(&json_value) {
270                            Ok(data) => data,
271                            Err(e) => {
272                                error!(error = %e, "Failed to serialize JSON");
273                                continue;
274                            }
275                        };
276                        let cursor = Cursor::new(json_data);
277
278                        let engine_lock = engine.read().await;
279                        let schema = engine_lock.schema();
280                        drop(engine_lock);
281
282                        let reader = match ReaderBuilder::new(schema.clone()).build(cursor) {
283                            Ok(r) => r,
284                            Err(e) => {
285                                error!(error = %e, "Failed to create JSON reader");
286                                continue;
287                            }
288                        };
289
290                        for batch_result in reader {
291                            match batch_result {
292                                Ok(batch) => {
293                                    let mut engine_lock = engine.write().await;
294                                    if let Err(e) = engine_lock.process_batch(&batch).await {
295                                        error!(error = %e, "Failed to process batch from WebSocket");
296                                    }
297                                }
298                                Err(e) => {
299                                    error!(error = %e, "Failed to read batch from WebSocket message");
300                                }
301                            }
302                        }
303                    }
304                    Err(e) => {
305                        error!(error = %e, "Failed to parse WebSocket binary as JSON");
306                    }
307                }
308            }
309            Ok(Message::Close(_)) => {
310                info!("WebSocket connection closed");
311                break;
312            }
313            Ok(Message::Ping(data)) => {
314                // Respond with pong
315                if sender.send(Message::Pong(data)).await.is_err() {
316                    break;
317                }
318            }
319            Ok(Message::Pong(_)) => {
320                // Ignore pong
321            }
322            Ok(Message::Frame(_)) => {
323                // Ignore frames
324            }
325            Err(e) => {
326                error!(error = %e, "WebSocket error");
327                break;
328            }
329        }
330    }
331
332    info!("WebSocket connection ended");
333    Ok(())
334}