fuse_rule/
server.rs

1use crate::evaluator::RuleEvaluator;
2use crate::RuleEngine;
3use arrow_json::ReaderBuilder;
4use axum::{
5    extract::{Json, Request, State},
6    http::{HeaderMap, StatusCode},
7    middleware::Next,
8    response::IntoResponse,
9    routing::{get, post},
10    Router,
11};
12use serde_json::Value;
13use std::collections::HashSet;
14use std::io::Cursor;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock};
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21pub type SharedEngine = Arc<RwLock<RuleEngine>>;
22
23/// API Key Authentication
24#[derive(Clone)]
25pub struct ApiKeyAuth {
26    keys: HashSet<String>,
27}
28
29impl ApiKeyAuth {
30    pub fn new(config_keys: Vec<String>) -> Self {
31        let mut keys = HashSet::new();
32
33        // Add keys from config file
34        for key in config_keys {
35            keys.insert(key);
36        }
37
38        // Add keys from environment variables (takes precedence)
39        // FUSERULE_API_KEY - single key
40        if let Ok(env_key) = std::env::var("FUSERULE_API_KEY") {
41            if !env_key.is_empty() {
42                keys.insert(env_key);
43            }
44        }
45
46        // FUSERULE_API_KEYS - comma-separated keys
47        if let Ok(env_keys) = std::env::var("FUSERULE_API_KEYS") {
48            for key in env_keys.split(',') {
49                let trimmed = key.trim();
50                if !trimmed.is_empty() {
51                    keys.insert(trimmed.to_string());
52                }
53            }
54        }
55
56        Self { keys }
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.keys.is_empty()
61    }
62
63    pub fn validate(&self, api_key: &str) -> bool {
64        self.keys.contains(api_key)
65    }
66}
67
68/// Authentication middleware
69pub async fn auth_middleware(
70    State(auth): State<ApiKeyAuth>,
71    headers: HeaderMap,
72    request: Request,
73    next: Next,
74) -> Result<axum::response::Response, StatusCode> {
75    // If no API keys configured, allow all requests
76    if auth.is_empty() {
77        return Ok(next.run(request).await);
78    }
79
80    // Extract API key from header
81    let api_key = headers
82        .get("X-API-Key")
83        .and_then(|v| v.to_str().ok())
84        .ok_or(StatusCode::UNAUTHORIZED)?;
85
86    // Validate API key
87    if !auth.validate(api_key) {
88        return Err(StatusCode::UNAUTHORIZED);
89    }
90
91    Ok(next.run(request).await)
92}
93
94/// Simple token bucket rate limiter
95struct RateLimiter {
96    tokens: Arc<Mutex<u32>>,
97    max_tokens: u32,
98    refill_interval: Duration,
99    last_refill: Arc<Mutex<Instant>>,
100}
101
102impl RateLimiter {
103    fn new(requests_per_second: u32) -> Self {
104        Self {
105            tokens: Arc::new(Mutex::new(requests_per_second)),
106            max_tokens: requests_per_second,
107            refill_interval: Duration::from_secs(1),
108            last_refill: Arc::new(Mutex::new(Instant::now())),
109        }
110    }
111
112    async fn allow(&self) -> bool {
113        let mut tokens = self.tokens.lock().await;
114        let mut last_refill = self.last_refill.lock().await;
115
116        // Refill tokens based on elapsed time
117        let elapsed = last_refill.elapsed();
118        if elapsed >= self.refill_interval {
119            let refills = (elapsed.as_secs_f64() / self.refill_interval.as_secs_f64()) as u32;
120            *tokens = (*tokens + refills).min(self.max_tokens);
121            *last_refill = Instant::now();
122        }
123
124        // Consume a token if available
125        if *tokens > 0 {
126            *tokens -= 1;
127            true
128        } else {
129            false
130        }
131    }
132}
133
134pub struct FuseRuleServer {
135    engine: SharedEngine,
136    config_path: String,
137    rate_limiter: Option<Arc<RateLimiter>>,
138    api_auth: ApiKeyAuth,
139}
140
141impl FuseRuleServer {
142    pub fn new(
143        engine: SharedEngine,
144        config_path: String,
145        rate_limit: Option<u32>,
146        api_keys: Vec<String>,
147    ) -> Self {
148        let rate_limiter = rate_limit.map(|rps| Arc::new(RateLimiter::new(rps)));
149        let api_auth = ApiKeyAuth::new(api_keys);
150        Self {
151            engine,
152            config_path,
153            rate_limiter,
154            api_auth,
155        }
156    }
157
158    pub async fn run(self, port: u16) -> anyhow::Result<()> {
159        let rate_limiter = self.rate_limiter.clone();
160        let api_auth = self.api_auth.clone();
161
162        // Public routes (no auth required)
163        let public_routes = Router::new()
164            .route("/status", get(handle_status))
165            .route("/health", get(handle_health))
166            .route("/metrics", get(handle_metrics));
167
168        // Protected routes (require API key if configured)
169        let protected_routes = Router::new()
170            .route("/rules", get(handle_rules))
171            .route("/api/v1/rules", post(handle_create_rule))
172            .route("/api/v1/rules/validate", post(handle_validate_rule))
173            .route(
174                "/api/v1/rules/:rule_id",
175                axum::routing::put(handle_update_rule),
176            )
177            .route(
178                "/api/v1/rules/:rule_id",
179                axum::routing::patch(handle_patch_rule),
180            )
181            .route(
182                "/api/v1/rules/:rule_id",
183                axum::routing::delete(handle_delete_rule),
184            )
185            .route("/api/v1/state", get(handle_state))
186            .route("/api/v1/state/:rule_id", get(handle_rule_state))
187            .route(
188                "/ingest",
189                post(move |state, body| {
190                    handle_ingest_with_rate_limit(state, body, rate_limiter.clone())
191                }),
192            )
193            .layer(axum::middleware::from_fn_with_state(
194                api_auth.clone(),
195                auth_middleware,
196            ));
197
198        let app = Router::new()
199            .merge(public_routes)
200            .merge(protected_routes)
201            .with_state(self.engine.clone());
202
203        let addr = format!("0.0.0.0:{}", port);
204        let listener = tokio::net::TcpListener::bind(&addr).await?;
205        info!("FuseRule Server running on http://{}", addr);
206
207        info!("Starting server with graceful shutdown handler");
208        axum::serve(listener, app)
209            .with_graceful_shutdown(shutdown_signal(
210                self.engine.clone(),
211                self.config_path.clone(),
212            ))
213            .await?;
214
215        info!("FuseRule Server shut down gracefully");
216        Ok(())
217    }
218}
219
220// Dynamic Rule Management API
221
222#[derive(serde::Deserialize)]
223struct CreateRuleRequest {
224    id: String,
225    name: String,
226    predicate: String,
227    action: String,
228    window_seconds: Option<u64>,
229    version: Option<u32>,
230    enabled: Option<bool>,
231    #[serde(default)]
232    dry_run: bool,
233}
234
235async fn handle_validate_rule(
236    State(engine): State<SharedEngine>,
237    Json(req): Json<CreateRuleRequest>,
238) -> impl IntoResponse {
239    use crate::rule::Rule;
240
241    let rule = Rule {
242        id: req.id.clone(),
243        name: req.name.clone(),
244        predicate: req.predicate.clone(),
245        action: req.action.clone(),
246        window_seconds: req.window_seconds,
247        version: req.version.unwrap_or(1),
248        enabled: req.enabled.unwrap_or(true),
249    };
250
251    let engine_lock = engine.read().await;
252    let schema = engine_lock.schema();
253    let evaluator = crate::evaluator::DataFusionEvaluator::new();
254
255    let mut errors = Vec::new();
256    let mut compiled = false;
257
258    // Validate predicate compilation
259    match evaluator.compile(rule.clone(), &schema) {
260        Ok(_) => {
261            compiled = true;
262        }
263        Err(e) => {
264            errors.push(format!("Predicate compilation failed: {}", e));
265        }
266    }
267
268    // Validate agent exists (if action specified)
269    if !req.action.is_empty() && !engine_lock.agents.contains_key(&req.action) {
270        errors.push(format!("Agent '{}' not found", req.action));
271    }
272
273    let valid = errors.is_empty() && compiled;
274
275    (
276        StatusCode::OK,
277        Json(serde_json::json!({
278            "valid": valid,
279            "compiled": compiled,
280            "errors": errors
281        })),
282    )
283}
284
285async fn handle_create_rule(
286    State(engine): State<SharedEngine>,
287    Json(req): Json<CreateRuleRequest>,
288) -> impl IntoResponse {
289    use crate::rule::Rule;
290
291    let rule = Rule {
292        id: req.id.clone(),
293        name: req.name.clone(),
294        predicate: req.predicate.clone(),
295        action: req.action.clone(),
296        window_seconds: req.window_seconds,
297        version: req.version.unwrap_or(1),
298        enabled: req.enabled.unwrap_or(true),
299    };
300
301    // Validate rule by attempting to compile it
302    let engine_lock = engine.read().await;
303    let schema = engine_lock.schema();
304    let evaluator = crate::evaluator::DataFusionEvaluator::new();
305
306    match evaluator.compile(rule.clone(), &schema) {
307        Ok(_) => {
308            if req.dry_run {
309                return (
310                    StatusCode::OK,
311                    Json(serde_json::json!({
312                        "message": "Rule validation successful",
313                        "rule": rule,
314                        "dry_run": true
315                    })),
316                );
317            }
318        }
319        Err(e) => {
320            return (
321                StatusCode::BAD_REQUEST,
322                Json(serde_json::json!({
323                    "error": "Rule compilation failed",
324                    "message": e.to_string()
325                })),
326            );
327        }
328    }
329
330    drop(engine_lock);
331
332    // Add rule to engine
333    let mut engine_lock = engine.write().await;
334    match engine_lock.add_rule(rule.clone()).await {
335        Ok(()) => (
336            StatusCode::CREATED,
337            Json(serde_json::json!({
338                "message": "Rule created successfully",
339                "rule": rule
340            })),
341        ),
342        Err(e) => (
343            StatusCode::INTERNAL_SERVER_ERROR,
344            Json(serde_json::json!({
345                "error": "Failed to add rule",
346                "message": e.to_string()
347            })),
348        ),
349    }
350}
351
352async fn handle_update_rule(
353    State(engine): State<SharedEngine>,
354    axum::extract::Path(rule_id): axum::extract::Path<String>,
355    Json(req): Json<CreateRuleRequest>,
356) -> impl IntoResponse {
357    use crate::rule::Rule;
358
359    // Ensure the rule_id in path matches the id in body
360    if req.id != rule_id {
361        return (
362            StatusCode::BAD_REQUEST,
363            Json(serde_json::json!({
364                "error": "Rule ID in path does not match ID in body",
365                "path_id": rule_id,
366                "body_id": req.id
367            })),
368        );
369    }
370
371    let rule = Rule {
372        id: req.id.clone(),
373        name: req.name.clone(),
374        predicate: req.predicate.clone(),
375        action: req.action.clone(),
376        window_seconds: req.window_seconds,
377        version: req.version.unwrap_or(1),
378        enabled: req.enabled.unwrap_or(true),
379    };
380
381    // Validate rule by attempting to compile it
382    let engine_lock = engine.read().await;
383    let schema = engine_lock.schema();
384    let evaluator = crate::evaluator::DataFusionEvaluator::new();
385
386    match evaluator.compile(rule.clone(), &schema) {
387        Ok(_) => {
388            // Validation successful
389        }
390        Err(e) => {
391            return (
392                StatusCode::BAD_REQUEST,
393                Json(serde_json::json!({
394                    "error": "Rule compilation failed",
395                    "message": e.to_string()
396                })),
397            );
398        }
399    }
400
401    drop(engine_lock);
402
403    // Update rule in engine
404    let mut engine_lock = engine.write().await;
405    match engine_lock.update_rule(&rule_id, rule.clone()).await {
406        Ok(()) => (
407            StatusCode::OK,
408            Json(serde_json::json!({
409                "message": "Rule updated successfully",
410                "rule": rule
411            })),
412        ),
413        Err(e) => (
414            StatusCode::NOT_FOUND,
415            Json(serde_json::json!({
416                "error": "Failed to update rule",
417                "message": e.to_string()
418            })),
419        ),
420    }
421}
422
423#[derive(serde::Deserialize)]
424struct PatchRuleRequest {
425    enabled: Option<bool>,
426    action: Option<String>,
427    name: Option<String>,
428    predicate: Option<String>,
429    window_seconds: Option<u64>,
430}
431
432async fn handle_patch_rule(
433    State(engine): State<SharedEngine>,
434    axum::extract::Path(rule_id): axum::extract::Path<String>,
435    Json(req): Json<PatchRuleRequest>,
436) -> impl IntoResponse {
437    let mut engine_lock = engine.write().await;
438
439    // Find rule
440    let rule_idx = engine_lock.rules.iter().position(|r| r.rule.id == rule_id);
441    if rule_idx.is_none() {
442        return (
443            StatusCode::NOT_FOUND,
444            Json(serde_json::json!({
445                "error": "Rule not found",
446                "rule_id": rule_id
447            })),
448        );
449    }
450
451    let rule_idx = rule_idx.unwrap();
452    let mut updated_rule = engine_lock.rules[rule_idx].rule.clone();
453
454    // Apply partial updates
455    if let Some(enabled) = req.enabled {
456        updated_rule.enabled = enabled;
457    }
458    if let Some(action) = req.action {
459        // Validate agent exists
460        if !engine_lock.agents.contains_key(&action) {
461            return (
462                StatusCode::BAD_REQUEST,
463                Json(serde_json::json!({
464                    "error": "Agent not found",
465                    "action": action
466                })),
467            );
468        }
469        updated_rule.action = action;
470    }
471    if let Some(name) = req.name {
472        updated_rule.name = name;
473    }
474    if let Some(predicate) = req.predicate {
475        // Validate predicate compiles
476        let schema = engine_lock.schema();
477        let evaluator = crate::evaluator::DataFusionEvaluator::new();
478        let test_rule = crate::rule::Rule {
479            id: updated_rule.id.clone(),
480            name: updated_rule.name.clone(),
481            predicate: predicate.clone(),
482            action: updated_rule.action.clone(),
483            window_seconds: updated_rule.window_seconds,
484            version: updated_rule.version,
485            enabled: updated_rule.enabled,
486        };
487        if evaluator.compile(test_rule, &schema).is_err() {
488            return (
489                StatusCode::BAD_REQUEST,
490                Json(serde_json::json!({
491                    "error": "Invalid predicate",
492                    "predicate": predicate
493                })),
494            );
495        }
496        updated_rule.predicate = predicate;
497    }
498    if let Some(window_seconds) = req.window_seconds {
499        updated_rule.window_seconds = Some(window_seconds);
500    }
501
502    // Update rule using update_rule method
503    match engine_lock
504        .update_rule(&rule_id, updated_rule.clone())
505        .await
506    {
507        Ok(()) => (
508            StatusCode::OK,
509            Json(serde_json::json!({
510                "message": "Rule updated successfully",
511                "rule": updated_rule
512            })),
513        ),
514        Err(e) => (
515            StatusCode::INTERNAL_SERVER_ERROR,
516            Json(serde_json::json!({
517                "error": "Failed to update rule",
518                "message": e.to_string()
519            })),
520        ),
521    }
522}
523
524async fn handle_delete_rule(
525    State(engine): State<SharedEngine>,
526    axum::extract::Path(rule_id): axum::extract::Path<String>,
527) -> impl IntoResponse {
528    let mut engine_lock = engine.write().await;
529
530    let rule_idx = engine_lock.rules.iter().position(|r| r.rule.id == rule_id);
531    if let Some(idx) = rule_idx {
532        engine_lock.rules.remove(idx);
533        engine_lock.window_buffers.remove(&rule_id);
534        (
535            StatusCode::OK,
536            Json(serde_json::json!({
537                "message": "Rule deleted successfully",
538                "rule_id": rule_id
539            })),
540        )
541    } else {
542        (
543            StatusCode::NOT_FOUND,
544            Json(serde_json::json!({
545                "error": "Rule not found",
546                "rule_id": rule_id
547            })),
548        )
549    }
550}
551
552// State Introspection API
553
554async fn handle_state(State(engine): State<SharedEngine>) -> impl IntoResponse {
555    let engine_lock = engine.read().await;
556    let mut states = Vec::new();
557
558    for rule in &engine_lock.rules {
559        let last_result = engine_lock
560            .state
561            .get_last_result(&rule.rule.id)
562            .await
563            .unwrap_or(crate::state::PredicateResult::False);
564
565        let window_size = engine_lock
566            .window_buffers
567            .get(&rule.rule.id)
568            .map(|b| {
569                b.get_batches()
570                    .iter()
571                    .map(|batch| batch.num_rows())
572                    .sum::<usize>()
573            })
574            .unwrap_or(0);
575
576        states.push(serde_json::json!({
577            "rule_id": rule.rule.id,
578            "rule_name": rule.rule.name,
579            "current_state": match last_result {
580                crate::state::PredicateResult::True => "active",
581                crate::state::PredicateResult::False => "inactive",
582            },
583            "window_size": window_size,
584            "enabled": rule.rule.enabled,
585        }));
586    }
587
588    (
589        StatusCode::OK,
590        Json(serde_json::json!({ "states": states })),
591    )
592}
593
594async fn handle_rule_state(
595    State(engine): State<SharedEngine>,
596    axum::extract::Path(rule_id): axum::extract::Path<String>,
597) -> impl IntoResponse {
598    let engine_lock = engine.read().await;
599
600    // Find rule
601    let rule = engine_lock.rules.iter().find(|r| r.rule.id == rule_id);
602    if rule.is_none() {
603        return (
604            StatusCode::NOT_FOUND,
605            Json(serde_json::json!({
606                "error": "Rule not found",
607                "rule_id": rule_id
608            })),
609        );
610    }
611
612    let rule = rule.unwrap();
613    let last_result = engine_lock
614        .state
615        .get_last_result(&rule_id)
616        .await
617        .unwrap_or(crate::state::PredicateResult::False);
618
619    let last_transition = engine_lock
620        .state
621        .get_last_transition_time(&rule_id)
622        .await
623        .ok()
624        .flatten();
625
626    let window_size = engine_lock
627        .window_buffers
628        .get(&rule_id)
629        .map(|b| {
630            b.get_batches()
631                .iter()
632                .map(|batch| batch.num_rows())
633                .sum::<usize>()
634        })
635        .unwrap_or(0);
636
637    // Get activation count from metrics
638    let metrics = crate::metrics::METRICS.snapshot();
639    let activation_count = metrics.rule_activations.get(&rule_id).copied().unwrap_or(0);
640
641    let mut response = serde_json::json!({
642        "rule_id": rule_id,
643        "rule_name": rule.rule.name,
644        "current_state": match last_result {
645            crate::state::PredicateResult::True => "active",
646            crate::state::PredicateResult::False => "inactive",
647        },
648        "activation_count": activation_count,
649        "window_size": window_size,
650        "enabled": rule.rule.enabled,
651        "timestamp": chrono::Utc::now().to_rfc3339(),
652    });
653
654    if let Some(transition_time) = last_transition {
655        response.as_object_mut().unwrap().insert(
656            "last_transition".to_string(),
657            serde_json::Value::String(transition_time.to_rfc3339()),
658        );
659    }
660
661    (StatusCode::OK, Json(response))
662}
663
664async fn shutdown_signal(engine: SharedEngine, config_path: String) {
665    // Add a small delay to ensure logs can flush
666    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
667    info!("Setting up shutdown signal handlers");
668
669    // Spawn reload handler as a background task (it runs forever)
670    #[cfg(unix)]
671    {
672        let engine_clone = engine.clone();
673        let config_path_clone = config_path.clone();
674        tokio::spawn(async move {
675            let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
676                .expect("failed to install SIGHUP handler");
677            info!("SIGHUP handler installed");
678            // Process SIGHUP signals in a loop - this task runs forever
679            loop {
680                // Wait for SIGHUP signal
681                if stream.recv().await.is_none() {
682                    // Stream closed unexpectedly - this shouldn't happen, but if it does,
683                    // we'll just wait forever to prevent shutdown
684                    warn!("SIGHUP signal stream closed unexpectedly");
685                    std::future::pending::<()>().await;
686                }
687
688                info!("SIGHUP received, reloading configuration...");
689                match crate::config::FuseRuleConfig::from_file(&config_path_clone) {
690                    Ok(new_config) => {
691                        let mut engine_lock = engine_clone.write().await;
692                        if let Err(e) = engine_lock.reload_from_config(new_config).await {
693                            error!("Failed to reload engine: {}", e);
694                        }
695                    }
696                    Err(e) => {
697                        error!("Failed to load config file for reload: {}", e);
698                    }
699                }
700            }
701        });
702    }
703
704    let ctrl_c = async {
705        info!("Waiting for Ctrl+C signal...");
706        tokio::signal::ctrl_c()
707            .await
708            .expect("failed to install Ctrl+C handler");
709        info!("Termination signal received (Ctrl+C)");
710    };
711
712    #[cfg(unix)]
713    let terminate = async {
714        info!("Waiting for SIGTERM signal...");
715        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
716            .expect("failed to install signal handler");
717        info!("SIGTERM handler installed, waiting for signal...");
718        // Wait for SIGTERM - this future only completes when we actually receive the signal
719        match stream.recv().await {
720            Some(_) => {
721                info!("Termination signal received (SIGTERM)");
722            }
723            None => {
724                // Stream closed unexpectedly - wait forever to prevent shutdown
725                warn!("SIGTERM signal stream closed unexpectedly - waiting indefinitely");
726                std::future::pending::<()>().await;
727            }
728        }
729    };
730
731    #[cfg(not(unix))]
732    let terminate = std::future::pending::<()>();
733
734    info!("Entering shutdown signal select loop - server will run until Ctrl+C or SIGTERM");
735    tokio::select! {
736        _ = ctrl_c => {
737            info!("Ctrl+C branch selected - shutting down");
738        },
739        _ = terminate => {
740            info!("SIGTERM branch selected - shutting down");
741        },
742    }
743    info!("Shutdown signal handler completed");
744}
745
746async fn handle_status() -> (StatusCode, Json<Value>) {
747    (
748        StatusCode::OK,
749        Json(serde_json::json!({ "status": "active" })),
750    )
751}
752
753async fn handle_health(State(engine): State<SharedEngine>) -> impl IntoResponse {
754    let engine_lock = engine.read().await;
755    let rule_count = engine_lock.rules.len();
756    let enabled_rules = engine_lock.rules.iter().filter(|r| r.rule.enabled).count();
757    let agent_count = engine_lock.agents.len();
758
759    let health = serde_json::json!({
760        "status": "healthy",
761        "engine": {
762            "rules_total": rule_count,
763            "rules_enabled": enabled_rules,
764            "rules_disabled": rule_count - enabled_rules,
765            "agents": agent_count,
766        },
767        "timestamp": chrono::Utc::now().to_rfc3339(),
768    });
769
770    (StatusCode::OK, Json(health))
771}
772
773async fn handle_rules(State(engine): State<SharedEngine>) -> impl IntoResponse {
774    let engine_lock = engine.read().await;
775    let rules: Vec<Value> = engine_lock
776        .rules
777        .iter()
778        .map(|r| {
779            serde_json::json!({
780                "id": r.rule.id,
781                "name": r.rule.name,
782                "predicate": r.rule.predicate,
783                "action": r.rule.action,
784                "window_seconds": r.rule.window_seconds,
785                "version": r.rule.version,
786                "enabled": r.rule.enabled,
787            })
788        })
789        .collect();
790
791    (StatusCode::OK, Json(serde_json::json!({ "rules": rules })))
792}
793
794async fn handle_metrics() -> String {
795    crate::metrics::METRICS.to_prometheus()
796}
797
798async fn handle_ingest_with_rate_limit(
799    State(engine): State<SharedEngine>,
800    Json(payload): Json<Value>,
801    rate_limiter: Option<Arc<RateLimiter>>,
802) -> (StatusCode, Json<Value>) {
803    // Apply rate limiting if configured
804    if let Some(limiter) = rate_limiter {
805        if !limiter.allow().await {
806            return (
807                StatusCode::TOO_MANY_REQUESTS,
808                Json(serde_json::json!({
809                    "error": "Rate limit exceeded",
810                    "message": "Too many requests. Please try again later."
811                })),
812            );
813        }
814    }
815
816    handle_ingest(State(engine), Json(payload)).await
817}
818
819async fn handle_ingest(
820    State(engine): State<SharedEngine>,
821    Json(payload): Json<Value>,
822) -> (StatusCode, Json<Value>) {
823    let request_id = Uuid::new_v4().to_string();
824    debug!(request_id = %request_id, "Received ingest request");
825
826    // 1. Convert JSON to Arrow RecordBatch
827    // arrow-json ReaderBuilder expects NDJSON format (one JSON object per line)
828    // or a single object, not a JSON array. Convert arrays to NDJSON format.
829    let json_data = if payload.is_array() {
830        // Convert array to NDJSON: each object on a newline
831        let array = payload.as_array().unwrap();
832        let mut ndjson = String::new();
833        for (i, item) in array.iter().enumerate() {
834            if i > 0 {
835                ndjson.push('\n');
836            }
837            ndjson.push_str(&serde_json::to_string(item).unwrap_or_default());
838        }
839        ndjson.into_bytes()
840    } else {
841        // Single object - serialize as-is
842        match serde_json::to_vec(&payload) {
843            Ok(data) => data,
844            Err(e) => {
845                error!(request_id = %request_id, error = %e, "Failed to serialize payload");
846                return (
847                    StatusCode::BAD_REQUEST,
848                    Json(serde_json::json!({
849                        "error": format!("Invalid JSON payload: {}", e),
850                        "request_id": request_id
851                    })),
852                );
853            }
854        }
855    };
856    let cursor = Cursor::new(json_data);
857
858    let mut engine_lock = engine.write().await;
859    let schema = engine_lock.schema();
860
861    // 2. Validate schema before processing
862    let reader = match ReaderBuilder::new(schema.clone()).build(cursor) {
863        Ok(r) => r,
864        Err(e) => {
865            error!(request_id = %request_id, error = %e, "Schema validation failed");
866            return (
867                StatusCode::BAD_REQUEST,
868                Json(serde_json::json!({
869                    "error": format!("Schema validation failed: {}. Expected schema: {:?}", e, schema),
870                    "request_id": request_id
871                })),
872            );
873        }
874    };
875
876    let mut all_traces = Vec::new();
877    let iter = reader.into_iter();
878    for batch_result in iter {
879        match batch_result {
880            Ok(batch) => {
881                debug!(
882                    request_id = %request_id,
883                    rows = batch.num_rows(),
884                    "Ingested batch"
885                );
886
887                // Schema evolution: validate and handle schema changes
888                let batch_schema = batch.schema();
889                if batch_schema != schema {
890                    // Check if it's a compatible evolution (new fields added)
891                    let mut compatible = true;
892                    let expected_fields: std::collections::HashSet<_> =
893                        schema.fields().iter().map(|f| f.name()).collect();
894                    let actual_fields: std::collections::HashSet<_> =
895                        batch_schema.fields().iter().map(|f| f.name()).collect();
896
897                    // Check if all expected fields are present (allowing new fields)
898                    for field_name in &expected_fields {
899                        if !actual_fields.contains(field_name) {
900                            compatible = false;
901                            break;
902                        }
903                    }
904
905                    if compatible {
906                        info!(
907                            request_id = %request_id,
908                            new_fields = ?actual_fields.difference(&expected_fields).collect::<Vec<_>>(),
909                            "Schema evolution detected - new fields added"
910                        );
911                        // In a full implementation, we'd update the schema here
912                        // For now, we just log and continue
913                    } else {
914                        warn!(
915                            request_id = %request_id,
916                            expected = ?schema,
917                            actual = ?batch_schema,
918                            "Schema mismatch - missing required fields"
919                        );
920                    }
921                }
922
923                match engine_lock.process_batch(&batch).await {
924                    Ok(traces) => {
925                        debug!(
926                            request_id = %request_id,
927                            trace_count = traces.len(),
928                            "Engine processed batch"
929                        );
930                        // Add request_id to all traces
931                        let traces_with_id: Vec<Value> = traces
932                            .into_iter()
933                            .map(|t| {
934                                let mut trace_json = serde_json::to_value(&t).unwrap();
935                                if let Some(obj) = trace_json.as_object_mut() {
936                                    obj.insert(
937                                        "request_id".to_string(),
938                                        serde_json::json!(request_id),
939                                    );
940                                }
941                                trace_json
942                            })
943                            .collect();
944                        all_traces.extend(traces_with_id);
945                    }
946                    Err(e) => {
947                        error!(
948                            request_id = %request_id,
949                            error = %e,
950                            "Engine processing error"
951                        );
952                        return (
953                            StatusCode::INTERNAL_SERVER_ERROR,
954                            Json(serde_json::json!({
955                                "error": format!("Engine error: {}", e),
956                                "request_id": request_id
957                            })),
958                        );
959                    }
960                }
961            }
962            Err(e) => {
963                error!(
964                    request_id = %request_id,
965                    error = %e,
966                    "Reader error"
967                );
968                return (
969                    StatusCode::BAD_REQUEST,
970                    Json(serde_json::json!({
971                        "error": format!("JSON Reader error: {}", e),
972                        "request_id": request_id
973                    })),
974                );
975            }
976        }
977    }
978
979    (
980        StatusCode::OK,
981        Json(serde_json::json!({
982            "message": "Processed",
983            "request_id": request_id,
984            "traces": all_traces
985        })),
986    )
987}