1use crate::evaluator::RuleEvaluator;
2use crate::RuleEngine;
3use arrow_json::ReaderBuilder;
4use axum::{
5 extract::{Json, Request, State},
6 http::{HeaderMap, StatusCode},
7 middleware::Next,
8 response::IntoResponse,
9 routing::{get, post},
10 Router,
11};
12use serde_json::Value;
13use std::collections::HashSet;
14use std::io::Cursor;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21pub type SharedEngine = Arc<RwLock<RuleEngine>>;
22
23#[derive(Clone)]
25pub struct ApiKeyAuth {
26 keys: HashSet<String>,
27}
28
29impl ApiKeyAuth {
30 pub fn new(config_keys: Vec<String>) -> Self {
31 let mut keys = HashSet::new();
32
33 for key in config_keys {
35 keys.insert(key);
36 }
37
38 if let Ok(env_key) = std::env::var("FUSERULE_API_KEY") {
41 if !env_key.is_empty() {
42 keys.insert(env_key);
43 }
44 }
45
46 if let Ok(env_keys) = std::env::var("FUSERULE_API_KEYS") {
48 for key in env_keys.split(',') {
49 let trimmed = key.trim();
50 if !trimmed.is_empty() {
51 keys.insert(trimmed.to_string());
52 }
53 }
54 }
55
56 Self { keys }
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.keys.is_empty()
61 }
62
63 pub fn validate(&self, api_key: &str) -> bool {
64 self.keys.contains(api_key)
65 }
66}
67
68pub async fn auth_middleware(
70 State(auth): State<ApiKeyAuth>,
71 headers: HeaderMap,
72 request: Request,
73 next: Next,
74) -> Result<axum::response::Response, StatusCode> {
75 if auth.is_empty() {
77 return Ok(next.run(request).await);
78 }
79
80 let api_key = headers
82 .get("X-API-Key")
83 .and_then(|v| v.to_str().ok())
84 .ok_or(StatusCode::UNAUTHORIZED)?;
85
86 if !auth.validate(api_key) {
88 return Err(StatusCode::UNAUTHORIZED);
89 }
90
91 Ok(next.run(request).await)
92}
93
94struct RateLimiter {
96 tokens: Arc<Mutex<u32>>,
97 max_tokens: u32,
98 refill_interval: Duration,
99 last_refill: Arc<Mutex<Instant>>,
100}
101
102impl RateLimiter {
103 fn new(requests_per_second: u32) -> Self {
104 Self {
105 tokens: Arc::new(Mutex::new(requests_per_second)),
106 max_tokens: requests_per_second,
107 refill_interval: Duration::from_secs(1),
108 last_refill: Arc::new(Mutex::new(Instant::now())),
109 }
110 }
111
112 async fn allow(&self) -> bool {
113 let mut tokens = self.tokens.lock().await;
114 let mut last_refill = self.last_refill.lock().await;
115
116 let elapsed = last_refill.elapsed();
118 if elapsed >= self.refill_interval {
119 let refills = (elapsed.as_secs_f64() / self.refill_interval.as_secs_f64()) as u32;
120 *tokens = (*tokens + refills).min(self.max_tokens);
121 *last_refill = Instant::now();
122 }
123
124 if *tokens > 0 {
126 *tokens -= 1;
127 true
128 } else {
129 false
130 }
131 }
132}
133
134pub struct FuseRuleServer {
135 engine: SharedEngine,
136 config_path: String,
137 rate_limiter: Option<Arc<RateLimiter>>,
138 api_auth: ApiKeyAuth,
139}
140
141impl FuseRuleServer {
142 pub fn new(
143 engine: SharedEngine,
144 config_path: String,
145 rate_limit: Option<u32>,
146 api_keys: Vec<String>,
147 ) -> Self {
148 let rate_limiter = rate_limit.map(|rps| Arc::new(RateLimiter::new(rps)));
149 let api_auth = ApiKeyAuth::new(api_keys);
150 Self {
151 engine,
152 config_path,
153 rate_limiter,
154 api_auth,
155 }
156 }
157
158 pub async fn run(self, port: u16) -> anyhow::Result<()> {
159 let rate_limiter = self.rate_limiter.clone();
160 let api_auth = self.api_auth.clone();
161
162 let public_routes = Router::new()
164 .route("/status", get(handle_status))
165 .route("/health", get(handle_health))
166 .route("/metrics", get(handle_metrics));
167
168 let protected_routes = Router::new()
170 .route("/rules", get(handle_rules))
171 .route("/api/v1/rules", post(handle_create_rule))
172 .route("/api/v1/rules/validate", post(handle_validate_rule))
173 .route(
174 "/api/v1/rules/:rule_id",
175 axum::routing::put(handle_update_rule),
176 )
177 .route(
178 "/api/v1/rules/:rule_id",
179 axum::routing::patch(handle_patch_rule),
180 )
181 .route(
182 "/api/v1/rules/:rule_id",
183 axum::routing::delete(handle_delete_rule),
184 )
185 .route("/api/v1/state", get(handle_state))
186 .route("/api/v1/state/:rule_id", get(handle_rule_state))
187 .route(
188 "/ingest",
189 post(move |state, body| {
190 handle_ingest_with_rate_limit(state, body, rate_limiter.clone())
191 }),
192 )
193 .layer(axum::middleware::from_fn_with_state(
194 api_auth.clone(),
195 auth_middleware,
196 ));
197
198 let app = Router::new()
199 .merge(public_routes)
200 .merge(protected_routes)
201 .with_state(self.engine.clone());
202
203 let addr = format!("0.0.0.0:{}", port);
204 let listener = tokio::net::TcpListener::bind(&addr).await?;
205 info!("FuseRule Server running on http://{}", addr);
206
207 info!("Starting server with graceful shutdown handler");
208 axum::serve(listener, app)
209 .with_graceful_shutdown(shutdown_signal(
210 self.engine.clone(),
211 self.config_path.clone(),
212 ))
213 .await?;
214
215 info!("FuseRule Server shut down gracefully");
216 Ok(())
217 }
218}
219
220#[derive(serde::Deserialize)]
223struct CreateRuleRequest {
224 id: String,
225 name: String,
226 predicate: String,
227 action: String,
228 window_seconds: Option<u64>,
229 version: Option<u32>,
230 enabled: Option<bool>,
231 #[serde(default)]
232 dry_run: bool,
233}
234
235async fn handle_validate_rule(
236 State(engine): State<SharedEngine>,
237 Json(req): Json<CreateRuleRequest>,
238) -> impl IntoResponse {
239 use crate::rule::Rule;
240
241 let rule = Rule {
242 id: req.id.clone(),
243 name: req.name.clone(),
244 predicate: req.predicate.clone(),
245 action: req.action.clone(),
246 window_seconds: req.window_seconds,
247 version: req.version.unwrap_or(1),
248 enabled: req.enabled.unwrap_or(true),
249 };
250
251 let engine_lock = engine.read().await;
252 let schema = engine_lock.schema();
253 let evaluator = crate::evaluator::DataFusionEvaluator::new();
254
255 let mut errors = Vec::new();
256 let mut compiled = false;
257
258 match evaluator.compile(rule.clone(), &schema) {
260 Ok(_) => {
261 compiled = true;
262 }
263 Err(e) => {
264 errors.push(format!("Predicate compilation failed: {}", e));
265 }
266 }
267
268 if !req.action.is_empty() && !engine_lock.agents.contains_key(&req.action) {
270 errors.push(format!("Agent '{}' not found", req.action));
271 }
272
273 let valid = errors.is_empty() && compiled;
274
275 (
276 StatusCode::OK,
277 Json(serde_json::json!({
278 "valid": valid,
279 "compiled": compiled,
280 "errors": errors
281 })),
282 )
283}
284
285async fn handle_create_rule(
286 State(engine): State<SharedEngine>,
287 Json(req): Json<CreateRuleRequest>,
288) -> impl IntoResponse {
289 use crate::rule::Rule;
290
291 let rule = Rule {
292 id: req.id.clone(),
293 name: req.name.clone(),
294 predicate: req.predicate.clone(),
295 action: req.action.clone(),
296 window_seconds: req.window_seconds,
297 version: req.version.unwrap_or(1),
298 enabled: req.enabled.unwrap_or(true),
299 };
300
301 let engine_lock = engine.read().await;
303 let schema = engine_lock.schema();
304 let evaluator = crate::evaluator::DataFusionEvaluator::new();
305
306 match evaluator.compile(rule.clone(), &schema) {
307 Ok(_) => {
308 if req.dry_run {
309 return (
310 StatusCode::OK,
311 Json(serde_json::json!({
312 "message": "Rule validation successful",
313 "rule": rule,
314 "dry_run": true
315 })),
316 );
317 }
318 }
319 Err(e) => {
320 return (
321 StatusCode::BAD_REQUEST,
322 Json(serde_json::json!({
323 "error": "Rule compilation failed",
324 "message": e.to_string()
325 })),
326 );
327 }
328 }
329
330 drop(engine_lock);
331
332 let mut engine_lock = engine.write().await;
334 match engine_lock.add_rule(rule.clone()).await {
335 Ok(()) => (
336 StatusCode::CREATED,
337 Json(serde_json::json!({
338 "message": "Rule created successfully",
339 "rule": rule
340 })),
341 ),
342 Err(e) => (
343 StatusCode::INTERNAL_SERVER_ERROR,
344 Json(serde_json::json!({
345 "error": "Failed to add rule",
346 "message": e.to_string()
347 })),
348 ),
349 }
350}
351
352async fn handle_update_rule(
353 State(engine): State<SharedEngine>,
354 axum::extract::Path(rule_id): axum::extract::Path<String>,
355 Json(req): Json<CreateRuleRequest>,
356) -> impl IntoResponse {
357 use crate::rule::Rule;
358
359 if req.id != rule_id {
361 return (
362 StatusCode::BAD_REQUEST,
363 Json(serde_json::json!({
364 "error": "Rule ID in path does not match ID in body",
365 "path_id": rule_id,
366 "body_id": req.id
367 })),
368 );
369 }
370
371 let rule = Rule {
372 id: req.id.clone(),
373 name: req.name.clone(),
374 predicate: req.predicate.clone(),
375 action: req.action.clone(),
376 window_seconds: req.window_seconds,
377 version: req.version.unwrap_or(1),
378 enabled: req.enabled.unwrap_or(true),
379 };
380
381 let engine_lock = engine.read().await;
383 let schema = engine_lock.schema();
384 let evaluator = crate::evaluator::DataFusionEvaluator::new();
385
386 match evaluator.compile(rule.clone(), &schema) {
387 Ok(_) => {
388 }
390 Err(e) => {
391 return (
392 StatusCode::BAD_REQUEST,
393 Json(serde_json::json!({
394 "error": "Rule compilation failed",
395 "message": e.to_string()
396 })),
397 );
398 }
399 }
400
401 drop(engine_lock);
402
403 let mut engine_lock = engine.write().await;
405 match engine_lock.update_rule(&rule_id, rule.clone()).await {
406 Ok(()) => (
407 StatusCode::OK,
408 Json(serde_json::json!({
409 "message": "Rule updated successfully",
410 "rule": rule
411 })),
412 ),
413 Err(e) => (
414 StatusCode::NOT_FOUND,
415 Json(serde_json::json!({
416 "error": "Failed to update rule",
417 "message": e.to_string()
418 })),
419 ),
420 }
421}
422
423#[derive(serde::Deserialize)]
424struct PatchRuleRequest {
425 enabled: Option<bool>,
426 action: Option<String>,
427 name: Option<String>,
428 predicate: Option<String>,
429 window_seconds: Option<u64>,
430}
431
432async fn handle_patch_rule(
433 State(engine): State<SharedEngine>,
434 axum::extract::Path(rule_id): axum::extract::Path<String>,
435 Json(req): Json<PatchRuleRequest>,
436) -> impl IntoResponse {
437 let mut engine_lock = engine.write().await;
438
439 let rule_idx = engine_lock.rules.iter().position(|r| r.rule.id == rule_id);
441 if rule_idx.is_none() {
442 return (
443 StatusCode::NOT_FOUND,
444 Json(serde_json::json!({
445 "error": "Rule not found",
446 "rule_id": rule_id
447 })),
448 );
449 }
450
451 let rule_idx = rule_idx.unwrap();
452 let mut updated_rule = engine_lock.rules[rule_idx].rule.clone();
453
454 if let Some(enabled) = req.enabled {
456 updated_rule.enabled = enabled;
457 }
458 if let Some(action) = req.action {
459 if !engine_lock.agents.contains_key(&action) {
461 return (
462 StatusCode::BAD_REQUEST,
463 Json(serde_json::json!({
464 "error": "Agent not found",
465 "action": action
466 })),
467 );
468 }
469 updated_rule.action = action;
470 }
471 if let Some(name) = req.name {
472 updated_rule.name = name;
473 }
474 if let Some(predicate) = req.predicate {
475 let schema = engine_lock.schema();
477 let evaluator = crate::evaluator::DataFusionEvaluator::new();
478 let test_rule = crate::rule::Rule {
479 id: updated_rule.id.clone(),
480 name: updated_rule.name.clone(),
481 predicate: predicate.clone(),
482 action: updated_rule.action.clone(),
483 window_seconds: updated_rule.window_seconds,
484 version: updated_rule.version,
485 enabled: updated_rule.enabled,
486 };
487 if evaluator.compile(test_rule, &schema).is_err() {
488 return (
489 StatusCode::BAD_REQUEST,
490 Json(serde_json::json!({
491 "error": "Invalid predicate",
492 "predicate": predicate
493 })),
494 );
495 }
496 updated_rule.predicate = predicate;
497 }
498 if let Some(window_seconds) = req.window_seconds {
499 updated_rule.window_seconds = Some(window_seconds);
500 }
501
502 match engine_lock
504 .update_rule(&rule_id, updated_rule.clone())
505 .await
506 {
507 Ok(()) => (
508 StatusCode::OK,
509 Json(serde_json::json!({
510 "message": "Rule updated successfully",
511 "rule": updated_rule
512 })),
513 ),
514 Err(e) => (
515 StatusCode::INTERNAL_SERVER_ERROR,
516 Json(serde_json::json!({
517 "error": "Failed to update rule",
518 "message": e.to_string()
519 })),
520 ),
521 }
522}
523
524async fn handle_delete_rule(
525 State(engine): State<SharedEngine>,
526 axum::extract::Path(rule_id): axum::extract::Path<String>,
527) -> impl IntoResponse {
528 let mut engine_lock = engine.write().await;
529
530 let rule_idx = engine_lock.rules.iter().position(|r| r.rule.id == rule_id);
531 if let Some(idx) = rule_idx {
532 engine_lock.rules.remove(idx);
533 engine_lock.window_buffers.remove(&rule_id);
534 (
535 StatusCode::OK,
536 Json(serde_json::json!({
537 "message": "Rule deleted successfully",
538 "rule_id": rule_id
539 })),
540 )
541 } else {
542 (
543 StatusCode::NOT_FOUND,
544 Json(serde_json::json!({
545 "error": "Rule not found",
546 "rule_id": rule_id
547 })),
548 )
549 }
550}
551
552async fn handle_state(State(engine): State<SharedEngine>) -> impl IntoResponse {
555 let engine_lock = engine.read().await;
556 let mut states = Vec::new();
557
558 for rule in &engine_lock.rules {
559 let last_result = engine_lock
560 .state
561 .get_last_result(&rule.rule.id)
562 .await
563 .unwrap_or(crate::state::PredicateResult::False);
564
565 let window_size = engine_lock
566 .window_buffers
567 .get(&rule.rule.id)
568 .map(|b| {
569 b.get_batches()
570 .iter()
571 .map(|batch| batch.num_rows())
572 .sum::<usize>()
573 })
574 .unwrap_or(0);
575
576 states.push(serde_json::json!({
577 "rule_id": rule.rule.id,
578 "rule_name": rule.rule.name,
579 "current_state": match last_result {
580 crate::state::PredicateResult::True => "active",
581 crate::state::PredicateResult::False => "inactive",
582 },
583 "window_size": window_size,
584 "enabled": rule.rule.enabled,
585 }));
586 }
587
588 (
589 StatusCode::OK,
590 Json(serde_json::json!({ "states": states })),
591 )
592}
593
594async fn handle_rule_state(
595 State(engine): State<SharedEngine>,
596 axum::extract::Path(rule_id): axum::extract::Path<String>,
597) -> impl IntoResponse {
598 let engine_lock = engine.read().await;
599
600 let rule = engine_lock.rules.iter().find(|r| r.rule.id == rule_id);
602 if rule.is_none() {
603 return (
604 StatusCode::NOT_FOUND,
605 Json(serde_json::json!({
606 "error": "Rule not found",
607 "rule_id": rule_id
608 })),
609 );
610 }
611
612 let rule = rule.unwrap();
613 let last_result = engine_lock
614 .state
615 .get_last_result(&rule_id)
616 .await
617 .unwrap_or(crate::state::PredicateResult::False);
618
619 let last_transition = engine_lock
620 .state
621 .get_last_transition_time(&rule_id)
622 .await
623 .ok()
624 .flatten();
625
626 let window_size = engine_lock
627 .window_buffers
628 .get(&rule_id)
629 .map(|b| {
630 b.get_batches()
631 .iter()
632 .map(|batch| batch.num_rows())
633 .sum::<usize>()
634 })
635 .unwrap_or(0);
636
637 let metrics = crate::metrics::METRICS.snapshot();
639 let activation_count = metrics.rule_activations.get(&rule_id).copied().unwrap_or(0);
640
641 let mut response = serde_json::json!({
642 "rule_id": rule_id,
643 "rule_name": rule.rule.name,
644 "current_state": match last_result {
645 crate::state::PredicateResult::True => "active",
646 crate::state::PredicateResult::False => "inactive",
647 },
648 "activation_count": activation_count,
649 "window_size": window_size,
650 "enabled": rule.rule.enabled,
651 "timestamp": chrono::Utc::now().to_rfc3339(),
652 });
653
654 if let Some(transition_time) = last_transition {
655 response.as_object_mut().unwrap().insert(
656 "last_transition".to_string(),
657 serde_json::Value::String(transition_time.to_rfc3339()),
658 );
659 }
660
661 (StatusCode::OK, Json(response))
662}
663
664async fn shutdown_signal(engine: SharedEngine, config_path: String) {
665 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
667 info!("Setting up shutdown signal handlers");
668
669 #[cfg(unix)]
671 {
672 let engine_clone = engine.clone();
673 let config_path_clone = config_path.clone();
674 tokio::spawn(async move {
675 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
676 .expect("failed to install SIGHUP handler");
677 info!("SIGHUP handler installed");
678 loop {
680 if stream.recv().await.is_none() {
682 warn!("SIGHUP signal stream closed unexpectedly");
685 std::future::pending::<()>().await;
686 }
687
688 info!("SIGHUP received, reloading configuration...");
689 match crate::config::FuseRuleConfig::from_file(&config_path_clone) {
690 Ok(new_config) => {
691 let mut engine_lock = engine_clone.write().await;
692 if let Err(e) = engine_lock.reload_from_config(new_config).await {
693 error!("Failed to reload engine: {}", e);
694 }
695 }
696 Err(e) => {
697 error!("Failed to load config file for reload: {}", e);
698 }
699 }
700 }
701 });
702 }
703
704 let ctrl_c = async {
705 info!("Waiting for Ctrl+C signal...");
706 tokio::signal::ctrl_c()
707 .await
708 .expect("failed to install Ctrl+C handler");
709 info!("Termination signal received (Ctrl+C)");
710 };
711
712 #[cfg(unix)]
713 let terminate = async {
714 info!("Waiting for SIGTERM signal...");
715 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
716 .expect("failed to install signal handler");
717 info!("SIGTERM handler installed, waiting for signal...");
718 match stream.recv().await {
720 Some(_) => {
721 info!("Termination signal received (SIGTERM)");
722 }
723 None => {
724 warn!("SIGTERM signal stream closed unexpectedly - waiting indefinitely");
726 std::future::pending::<()>().await;
727 }
728 }
729 };
730
731 #[cfg(not(unix))]
732 let terminate = std::future::pending::<()>();
733
734 info!("Entering shutdown signal select loop - server will run until Ctrl+C or SIGTERM");
735 tokio::select! {
736 _ = ctrl_c => {
737 info!("Ctrl+C branch selected - shutting down");
738 },
739 _ = terminate => {
740 info!("SIGTERM branch selected - shutting down");
741 },
742 }
743 info!("Shutdown signal handler completed");
744}
745
746async fn handle_status() -> (StatusCode, Json<Value>) {
747 (
748 StatusCode::OK,
749 Json(serde_json::json!({ "status": "active" })),
750 )
751}
752
753async fn handle_health(State(engine): State<SharedEngine>) -> impl IntoResponse {
754 let engine_lock = engine.read().await;
755 let rule_count = engine_lock.rules.len();
756 let enabled_rules = engine_lock.rules.iter().filter(|r| r.rule.enabled).count();
757 let agent_count = engine_lock.agents.len();
758
759 let health = serde_json::json!({
760 "status": "healthy",
761 "engine": {
762 "rules_total": rule_count,
763 "rules_enabled": enabled_rules,
764 "rules_disabled": rule_count - enabled_rules,
765 "agents": agent_count,
766 },
767 "timestamp": chrono::Utc::now().to_rfc3339(),
768 });
769
770 (StatusCode::OK, Json(health))
771}
772
773async fn handle_rules(State(engine): State<SharedEngine>) -> impl IntoResponse {
774 let engine_lock = engine.read().await;
775 let rules: Vec<Value> = engine_lock
776 .rules
777 .iter()
778 .map(|r| {
779 serde_json::json!({
780 "id": r.rule.id,
781 "name": r.rule.name,
782 "predicate": r.rule.predicate,
783 "action": r.rule.action,
784 "window_seconds": r.rule.window_seconds,
785 "version": r.rule.version,
786 "enabled": r.rule.enabled,
787 })
788 })
789 .collect();
790
791 (StatusCode::OK, Json(serde_json::json!({ "rules": rules })))
792}
793
794async fn handle_metrics() -> String {
795 crate::metrics::METRICS.to_prometheus()
796}
797
798async fn handle_ingest_with_rate_limit(
799 State(engine): State<SharedEngine>,
800 Json(payload): Json<Value>,
801 rate_limiter: Option<Arc<RateLimiter>>,
802) -> (StatusCode, Json<Value>) {
803 if let Some(limiter) = rate_limiter {
805 if !limiter.allow().await {
806 return (
807 StatusCode::TOO_MANY_REQUESTS,
808 Json(serde_json::json!({
809 "error": "Rate limit exceeded",
810 "message": "Too many requests. Please try again later."
811 })),
812 );
813 }
814 }
815
816 handle_ingest(State(engine), Json(payload)).await
817}
818
819async fn handle_ingest(
820 State(engine): State<SharedEngine>,
821 Json(payload): Json<Value>,
822) -> (StatusCode, Json<Value>) {
823 let request_id = Uuid::new_v4().to_string();
824 debug!(request_id = %request_id, "Received ingest request");
825
826 let json_data = if payload.is_array() {
830 let array = payload.as_array().unwrap();
832 let mut ndjson = String::new();
833 for (i, item) in array.iter().enumerate() {
834 if i > 0 {
835 ndjson.push('\n');
836 }
837 ndjson.push_str(&serde_json::to_string(item).unwrap_or_default());
838 }
839 ndjson.into_bytes()
840 } else {
841 match serde_json::to_vec(&payload) {
843 Ok(data) => data,
844 Err(e) => {
845 error!(request_id = %request_id, error = %e, "Failed to serialize payload");
846 return (
847 StatusCode::BAD_REQUEST,
848 Json(serde_json::json!({
849 "error": format!("Invalid JSON payload: {}", e),
850 "request_id": request_id
851 })),
852 );
853 }
854 }
855 };
856 let cursor = Cursor::new(json_data);
857
858 let mut engine_lock = engine.write().await;
859 let schema = engine_lock.schema();
860
861 let reader = match ReaderBuilder::new(schema.clone()).build(cursor) {
863 Ok(r) => r,
864 Err(e) => {
865 error!(request_id = %request_id, error = %e, "Schema validation failed");
866 return (
867 StatusCode::BAD_REQUEST,
868 Json(serde_json::json!({
869 "error": format!("Schema validation failed: {}. Expected schema: {:?}", e, schema),
870 "request_id": request_id
871 })),
872 );
873 }
874 };
875
876 let mut all_traces = Vec::new();
877 let iter = reader.into_iter();
878 for batch_result in iter {
879 match batch_result {
880 Ok(batch) => {
881 debug!(
882 request_id = %request_id,
883 rows = batch.num_rows(),
884 "Ingested batch"
885 );
886
887 let batch_schema = batch.schema();
889 if batch_schema != schema {
890 let mut compatible = true;
892 let expected_fields: std::collections::HashSet<_> =
893 schema.fields().iter().map(|f| f.name()).collect();
894 let actual_fields: std::collections::HashSet<_> =
895 batch_schema.fields().iter().map(|f| f.name()).collect();
896
897 for field_name in &expected_fields {
899 if !actual_fields.contains(field_name) {
900 compatible = false;
901 break;
902 }
903 }
904
905 if compatible {
906 info!(
907 request_id = %request_id,
908 new_fields = ?actual_fields.difference(&expected_fields).collect::<Vec<_>>(),
909 "Schema evolution detected - new fields added"
910 );
911 } else {
914 warn!(
915 request_id = %request_id,
916 expected = ?schema,
917 actual = ?batch_schema,
918 "Schema mismatch - missing required fields"
919 );
920 }
921 }
922
923 match engine_lock.process_batch(&batch).await {
924 Ok(traces) => {
925 debug!(
926 request_id = %request_id,
927 trace_count = traces.len(),
928 "Engine processed batch"
929 );
930 let traces_with_id: Vec<Value> = traces
932 .into_iter()
933 .map(|t| {
934 let mut trace_json = serde_json::to_value(&t).unwrap();
935 if let Some(obj) = trace_json.as_object_mut() {
936 obj.insert(
937 "request_id".to_string(),
938 serde_json::json!(request_id),
939 );
940 }
941 trace_json
942 })
943 .collect();
944 all_traces.extend(traces_with_id);
945 }
946 Err(e) => {
947 error!(
948 request_id = %request_id,
949 error = %e,
950 "Engine processing error"
951 );
952 return (
953 StatusCode::INTERNAL_SERVER_ERROR,
954 Json(serde_json::json!({
955 "error": format!("Engine error: {}", e),
956 "request_id": request_id
957 })),
958 );
959 }
960 }
961 }
962 Err(e) => {
963 error!(
964 request_id = %request_id,
965 error = %e,
966 "Reader error"
967 );
968 return (
969 StatusCode::BAD_REQUEST,
970 Json(serde_json::json!({
971 "error": format!("JSON Reader error: {}", e),
972 "request_id": request_id
973 })),
974 );
975 }
976 }
977 }
978
979 (
980 StatusCode::OK,
981 Json(serde_json::json!({
982 "message": "Processed",
983 "request_id": request_id,
984 "traces": all_traces
985 })),
986 )
987}